Basically, I had the parent thread prepare a single nested object that all children could read from. They passed information around to each other without communicating through the main thread. This worked very well.
thqby's v2 removed CriticalObject and has a very different syntax so I'm kind of lost how to achieve the same net effect. I'll post my working v1 multithread code so you can see what I did there with the main thread and a child thread.
Main thread
Code: Select all
processLiveScrapeQueue(){ ;multithreaded
xtObj := CriticalObject()
xtObjPtr := CriticalObject(xtObj,1) ;extracts the pointer to our cross-thread object
CriSec := CriticalSection()
;EnterCriticalSection(CriSec)
for k,v in this.tempWorkerQueue{
xtObj["workerQueue",v["intendedWorkerId"],v["jobName"]] := xtObj["readerQueue",v["intendedReaderId"],v["jobName"]] := xtObj["writerQueue",1,v["jobName"]] := v
v := []
}
this.tempWorkerQueue := [] ;break this reference so it doesn't bloat xtObj
xtObj["ids","tweet"] := []
xtObj["ids","quotedTweet"] := []
xtObj["ids","replyToTweet"] := []
xtObj["ids","user"] := []
xtObj["ids","mentionedUser"] := []
pendingJobs := xtObj["workerQueue"].count()
xtObj["pendingJobs","worker"] := pendingJobs
xtObj["pendingJobs","reader"] := pendingJobs
xtObj["pendingJobs","writer"] := pendingJobs
debug := 0
threadScriptObj := []
;spawn all allowed worker threads
static workerScript := FileOpen(a_scriptdir "\lib\mudchirp_liveScrapeWorker.ahk","r").read()
loop, % Min(this.concurrentLiveScrapes,pendingJobs) { ;
workerId%a_index% := AhkThread(workerScript,xtObjPtr a_space CriSec a_space debug a_space a_index "")
xtObj["currentLiveThreads","worker",a_index] := 1
}
;spawn the single reader thread
static readerScript := FileOpen(a_scriptdir "\lib\mudchirp_liveScrapeReader.ahk","r").read()
loop, % min(this.concurrentLiveScrapeReads,pendingJobs) {
;pointer to cross-thread object (space) critical section ... the space will stringify the numbers and create 4x A_Args
readerId%a_index% := AhkThread(readerScript, xtObjPtr a_space CriSec a_space debug a_space a_index)
xtObj["currentLiveThreads","reader",a_index] := 1
;xtObj["readerQueue",a_index] := []
}
;spawn the single writer thread to handle all SQLite writes
;formatted in such a way as to allow additional writer threads in the future
static writerScript := FileOpen(a_scriptdir "\lib\mudchirp_liveScrapeWriter.ahk","r").read()
writerId1 := AhkThread(writerScript, xtObjPtr a_space CriSec a_space debug a_space 1)
xtObj["currentLiveThreads","writer",1] := 1
pendingJobs := []
loop, {
sleep, 1000 ;checks for completion once every second
readerCount := 0
for k,v in xtObj["readerQueue"]
readerCount += v.count()
writerCount := 0
for k,v in xtObj["writerQueue"]
writerCount += v.count()
workerCount := 0
for k,v in xtObj["workerQueue"]
workerCount += v.count()
pendingJobs["1: worker"] := workerCount
pendingJobs["2: reader"] := readerCount
pendingJobs["3: writer"] := writerCount
ToolTip, % st_printarr(pendingJobs)
} until ((pendingJobs["1: worker"] + pendingJobs["2: reader"] + pendingJobs["3: writer"]) = 0)
}
}
Code: Select all
xtObj := CriticalObject(a_args[1]) ; get CriticalObject from pointer
CriSec := a_args[2] ; get CriticalSection
debug := a_args[3] ; duh
readerId := a_args[4] ; duh
threadType := "reader"
sleep,1000
takenJobs := []
tempIds := []
loop{
jobClaimed := 0
jobIndex := ""
job := ""
for k,v in xtObj["readerQueue",readerId]{
;msgbox % st_printArr(v)
if (v["jobStatus"] != "pending reader")
Continue
jobIndex := k
job := v
jobClaimed := 1
job["jobStatus"] := "claimed by reader"
break ;found a job
}
if (jobClaimed = 0){
sleep,100
Continue
}
job["queueForWriter"] := processJobFile(trim(job["output"],chr(34)), target)
job["jobStatus"] := "pending writer"
xtObj["readerQueue",readerId].delete(jobIndex)
if (xtObj["readerQueue",readerId].count() = 0){
ExitApp
}
}
Any and all pointers would be welcome. Thanks!