Commit 40418871 authored by Rob Hulswit's avatar Rob Hulswit

Merge branch 'feature/limit-worker-runtime' into 'master'

[backport?] limit runtime for queue workers

See merge request !239
parents 341f49ba d45e726e
......@@ -17,7 +17,7 @@ LOADLIB "mod::system/lib/internal/taskqueue.whlib";
LOADLIB "mod::system/lib/internal/tasks.whlib";
LOADLIB "mod::system/lib/services.whlib";
INTEGER restart_workers_after := 15 * 60 * 1000; //restart workers after
INTEGER nextworkeruid := 0;
BOOLEAN debug, profileworkers;
BOOLEAN manualjs, manualhs;
......@@ -193,6 +193,9 @@ STATIC OBJECTTYPE ManagedWorker
/// Sent an interrupt for terminating?
BOOLEAN sentinterrupt;
/// First task time
DATETIME workerfirsttask;
// ---------------------------------------------------------------------------
//
// Public variables
......@@ -266,17 +269,13 @@ STATIC OBJECTTYPE ManagedWorker
RETURN;
}
this->CloseProcess(TRUE);
this->CloseProcess(TRUE); //also scheduled CheckMoreWorkers
IF (ObjectExists(this->pvt_activetask))
{
RECORD out := ParseOutputForStackTrace(this->fulloutput);
FailTask(this->pvt_activetask, out.output, out.trace, FALSE);
}
//Rerun with a delay
RegisterTimedCallback(AddTimeToDate(1000, GetCurrentDatetime()), PTR CheckMoreWorkers);
RETURN;
}
}
......@@ -408,6 +407,9 @@ STATIC OBJECTTYPE ManagedWorker
THROW NEW Exception("We weren't expecting a task");
this->pvt_activetask := task;
IF(this->workerfirsttask = DEFAULT DATETIME)
this->workerfirsttask := GetCurrentDatetime(); //now the countdown for garbage collection starts
this->taskresolve(task->taskdata.executeinfo);
this->taskresolve := DEFAULT MACRO PTR;
}
......@@ -480,7 +482,16 @@ STATIC OBJECTTYPE ManagedWorker
this->fulloutput := "";
if (this->pvt_state = "terminating")
{
this->RequestShutdown();
}
ELSE IF(GetMsecsDifference(this->workerfirsttask, GetCurrentDatetime()) > restart_workers_after) //time to restart
{
IF (debug)
PRINT(`Restart long enough running worker ${this->uid} from cluster ${this->cluster}\n`);
this->RequestShutdown();
ScheduleMicroTask(PTR CheckMoreWorkers);
}
}
/** Request the worker to shut down whenever the current task is done, sends an interrupt
......@@ -1243,6 +1254,7 @@ MACRO ReportProfile()
RECORD args := ParseArguments(GetConsoleArguments(),
[ [ name := "failreschedule", type := "stringopt" ]
, [ name := "restartafter", type := "stringopt" ]
, [ name := "debug", type := "switch" ]
, [ name := "profile", type := "switch" ]
, [ name := "profileworkers", type := "switch" ]
......@@ -1252,7 +1264,7 @@ RECORD args := ParseArguments(GetConsoleArguments(),
IF(NOT RecordExists(args))
{
Print("Syntax: managedqueuemgr [--failreschedule <failreschedule>] [--debug] [--profile] [--profileworkers] [--manualjs] [--manualhs]\n");
Print("Syntax: managedqueuemgr [--failreschedule <failreschedule>] [--restartafter <secs>] [--debug] [--profile] [--profileworkers] [--manualjs] [--manualhs]\n");
SetConsoleExitCode(1);
RETURN;
}
......@@ -1267,6 +1279,12 @@ IF(args.profile)
SetupFunctionProfiling("managedqueuemgr", "managedqueuemgr");
RegisterTimedCallback(AddTimeToDate(30000, GetCurrentDatetime()), PTR ReportProfile);
}
IF(args.restartafter != "")
{
restart_workers_after := ToInteger(args.restartafter,-1) * 1000;
IF(restart_workers_after < 0)
THROW NEW Exception(`Illegal --restartafter value: ${args.restartafter}`);
}
OpenPrimary();
controller := NEW QueueMgr;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment