Skip to content

Commit

Permalink
Revert "Jason work to fix worker priority scheduling"
Browse files Browse the repository at this point in the history
This reverts commit 29ad419.

revert bad merge conflict
  • Loading branch information
zef committed Dec 17, 2024
1 parent 29ad419 commit 4ec0a8f
Showing 1 changed file with 156 additions and 174 deletions.
330 changes: 156 additions & 174 deletions cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public class SchedulerDbService extends DbService implements InitializingBean {
public static final int DEFAULT_WORKER_PROC_DEF_MAX_INSTANCES = 1;
public static final int PROCESSES_PAGE_SIZE = 50;

public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid, priority FROM cws_sched_worker_proc_inst " +
"WHERE " +
" status='"+PENDING+"' AND " +
" proc_def_key=? " +
"ORDER BY " +
" priority ASC, " + // lower priorities favored
" created_time ASC " + // older dates (FIFO) favored
"LIMIT ?";
public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid FROM cws_sched_worker_proc_inst " +
"WHERE " +
" status='" + PENDING + "' AND " +
" proc_def_key=? " +
"ORDER BY " +
" priority ASC, " + // lower priorities favored
" created_time ASC " + // older dates (FIFO) favored
"LIMIT ?";

public static final String UPDATE_CLAIMABLE_ROW_SQL =
"UPDATE cws_sched_worker_proc_inst " +
Expand Down Expand Up @@ -242,173 +242,161 @@ public int updateProcInstIdAndStartedByWorker(


/**
* Attempt to claim a process start request in the database.
*
* @param workerProcsList -- attempts to claim rows for the active set of process definition(s)
* @return mappings of claimUuids and claimedRowUuids
*
*/

public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, Map<String,Integer> workerProcsList, Map<String,Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<Map<String, Object>> rowUuidsPerProcDefKey = new ArrayList<Map<String, Object>>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<Map<String, Object>> unfilteredProcesses = new ArrayList<Map<String, Object>>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
String claimUuid = null;
int attempts = 0;

// Try, until succeeding in claiming at least one row
//
while (attempts++ < 10) {
try {
// Find claimable rows
//
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, new Object[] {procs.getKey(), procs.getValue()*2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredProcesses.addAll(rowUuidsPerProcDefKey);
}

unfilteredProcesses.sort(new Comparator<Map<String, Object>>() {
public int compare(Map<String, Object> one, Map<String, Object> two) {
return ((Integer) one.get("priority")).compareTo((Integer) two.get("priority"));
* Attempt to claim a process start request in the database.
*
* @param workerProcsList -- attempts to claim rows for the active set of process definition(s)
* @return mappings of claimUuids and claimedRowUuids
*/

public Map<String, List<String>> claimHighestPriorityStartReq(String workerId, Map<String, Integer> workerProcsList, Map<String, Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<String> rowUuidsPerProcDefKey = new ArrayList<String>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<String> unfilteredRowUuids = new ArrayList<String>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
String claimUuid = null;
int attempts = 0;

// Try, until succeeding in claiming at least one row
//
while (attempts++ < 10) {
try {
// Find claimable rows
//
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[]{procs.getKey(), procs.getValue() * 2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredRowUuids.addAll(rowUuidsPerProcDefKey);
}

Collections.sort(unfilteredRowUuids);
for (String id : unfilteredRowUuids) {
String procDefKeyString = getProcDefKeyFromUuid(id);
uuidAndProcDefKeyPair.put(id, procDefKeyString);
}

for (Map.Entry<String, Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
}
});

for (Map<String, Object> proc : unfilteredProcesses) {
String uuid = (String) proc.get("uuid");
String procDefKeyString = getProcDefKeyFromUuid(uuid);
uuidAndProcDefKeyPair.put(uuid, procDefKeyString);
}

for (Map.Entry<String,Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
for (String uuid : rowUuids) {
claimUuid = UUID.randomUUID().toString();
int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL,
new Object[] {workerId, claimUuid, uuid, workerId});

if (updateCount == 1) {
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
break; // we have claimed up to the limit, so stop claiming
}
}

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
}
else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
}
else if (log.isTraceEnabled()) {
log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME");
}

break; // no retry needed
}
catch (DeadlockLoserDataAccessException e) {
if (attempts == 10) {
log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!..");
break; // give up
}
log.warn("Caught a DeadlockLoserDataAccessException. Retrying..");
continue; // retry
}
catch (Throwable t) {
log.error("Unexpected exception. Not retrying..", t);
break; // abort
}
} // end while (attempts)

long timeTaken = System.currentTimeMillis() - t0;
if (timeTaken > SLOW_WARN_THRESHOLD) {
log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!");
}
if (numClaimed >= 1) {
log.info("worker " + workerId + " claimed " + numClaimed + " row(s).");
}
else {
log.trace("no rows claimed by worker: " + workerId);
}

if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()" );
}

Map<String,List<String>> ret = new HashMap<String,List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
for (String uuid : rowUuids) {
claimUuid = UUID.randomUUID().toString();
int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL,
new Object[]{workerId, claimUuid, uuid, workerId});

if (updateCount == 1) {
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
break; // we have claimed up to the limit, so stop claiming
}
}

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
} else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
} else if (log.isTraceEnabled()) {
log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME");
}

break; // no retry needed
} catch (DeadlockLoserDataAccessException e) {
if (attempts == 10) {
log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!..");
break; // give up
}
log.warn("Caught a DeadlockLoserDataAccessException. Retrying..");
continue; // retry
} catch (Throwable t) {
log.error("Unexpected exception. Not retrying..", t);
break; // abort
}
} // end while (attempts)

long timeTaken = System.currentTimeMillis() - t0;
if (timeTaken > SLOW_WARN_THRESHOLD) {
log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!");
}
if (numClaimed >= 1) {
log.info("worker " + workerId + " claimed " + numClaimed + " row(s).");
} else {
log.trace("no rows claimed by worker: " + workerId);
}

if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()");
}

Map<String, List<String>> ret = new HashMap<String, List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}


public String getProcInstRowStatus(String uuid) {
List<Map<String,Object>> list = jdbcTemplate.queryForList(
"SELECT status FROM cws_sched_worker_proc_inst " +
"WHERE uuid=?",
new Object[] {uuid});
if (list != null && !list.isEmpty()) {
return list.iterator().next().values().iterator().next().toString();
}
else {
return null;
}
}
List<Map<String, Object>> list = jdbcTemplate.queryForList(
"SELECT status FROM cws_sched_worker_proc_inst " +
"WHERE uuid=?",
new Object[]{uuid});
if (list != null && !list.isEmpty()) {
return list.iterator().next().values().iterator().next().toString();
} else {
return null;
}
}

public int getMaxProcsValueForWorker(String workerId) {
return jdbcTemplate.queryForObject(
"SELECT max_num_running_procs FROM cws_worker WHERE id=?",
new Object[] {workerId}, Integer.class);
}
return jdbcTemplate.queryForObject(
"SELECT max_num_running_procs FROM cws_worker WHERE id=?",
new Object[]{workerId}, Integer.class);
}

public int getCountForClaimedProcInstPerKey(String procDefKey, List<String> claimedUuids) {
String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"" ;
String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")";
return jdbcTemplate.queryForObject(query, Integer.class);
}
String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"";
String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")";
return jdbcTemplate.queryForObject(query, Integer.class);
}

public List<Map<String, Object>> getProcDefKeyLatestCompleteInst(String procDefKey) {
return jdbcTemplate.queryForList(
Expand Down Expand Up @@ -1133,7 +1121,6 @@ public List<Map<String, Object>> getProcessInstanceStats(String lastNumHours) {

List<Map<String, Object>> camundaAndCwsStatuses = jdbcTemplate.queryForList(query, time, time, time, time);

<<<<<<< HEAD
ret.addAll(camundaAndCwsStatuses);

return ret;
Expand Down Expand Up @@ -1641,8 +1628,3 @@ public int retryFailedToStart(List<String> uuids) {
return jdbcTemplate.update(query);
}
}
=======
return jdbcTemplate.update(query);
}
}
>>>>>>> aa10384 (Jason work to fix worker priority scheduling)

0 comments on commit 4ec0a8f

Please sign in to comment.