(v2) How do I share an object between child threads?

Ask for help, how to use AHK_H, etc.
Qriist
Posts: 82
Joined: 11 Sep 2016, 04:02

(v2) How do I share an object between child threads?

Post by Qriist » 05 Feb 2024, 18:24

Okay, so I'm trying to replicate a multithreading "framework" in @thqby's v2 that I built in @HotKeyIt's v1.

Basically, I had the parent thread prepare a single nested object that all children could read from. They passed information around to each other without communicating through the main thread. This worked very well.

thqby's v2 removed CriticalObject and has a very different syntax so I'm kind of lost how to achieve the same net effect. I'll post my working v1 multithread code so you can see what I did there with the main thread and a child thread.

Main thread

Code: Select all

	processLiveScrapeQueue(){	;multithreaded
		xtObj := CriticalObject()
		xtObjPtr := CriticalObject(xtObj,1)	;extracts the pointer to our cross-thread object		
		CriSec := CriticalSection()
		
		;EnterCriticalSection(CriSec)
		for k,v in this.tempWorkerQueue{
			xtObj["workerQueue",v["intendedWorkerId"],v["jobName"]] := xtObj["readerQueue",v["intendedReaderId"],v["jobName"]] := xtObj["writerQueue",1,v["jobName"]] := v
			v := []
		}
		this.tempWorkerQueue := []	;break this reference so it doesn't bloat xtObj
		xtObj["ids","tweet"] := []
		xtObj["ids","quotedTweet"] := []
		xtObj["ids","replyToTweet"] := []
		xtObj["ids","user"] := []
		xtObj["ids","mentionedUser"] := []
		pendingJobs := xtObj["workerQueue"].count()
		xtObj["pendingJobs","worker"] := pendingJobs
		xtObj["pendingJobs","reader"] := pendingJobs
		xtObj["pendingJobs","writer"] := pendingJobs
		
		debug := 0
		threadScriptObj := []
		
		;spawn all allowed worker threads
		static workerScript := FileOpen(a_scriptdir "\lib\mudchirp_liveScrapeWorker.ahk","r").read()
		loop, % Min(this.concurrentLiveScrapes,pendingJobs) {	;
			workerId%a_index% := AhkThread(workerScript,xtObjPtr a_space CriSec a_space debug a_space a_index "")
			xtObj["currentLiveThreads","worker",a_index] := 1
		}
		
		;spawn the single reader thread 
		static readerScript := FileOpen(a_scriptdir "\lib\mudchirp_liveScrapeReader.ahk","r").read()
		loop,  % min(this.concurrentLiveScrapeReads,pendingJobs) {
			;pointer to cross-thread object (space) critical section  ... the space will stringify the numbers and create 4x A_Args
			readerId%a_index% := AhkThread(readerScript, xtObjPtr a_space CriSec a_space debug a_space a_index)	
			xtObj["currentLiveThreads","reader",a_index] := 1
			;xtObj["readerQueue",a_index] := []
		}
		
		;spawn the single writer thread to handle all SQLite writes
		;formatted in such a way as to allow additional writer threads in the future
		static writerScript := FileOpen(a_scriptdir "\lib\mudchirp_liveScrapeWriter.ahk","r").read()
		writerId1 := AhkThread(writerScript, xtObjPtr a_space CriSec a_space debug a_space 1)
		xtObj["currentLiveThreads","writer",1] := 1
		
		pendingJobs := []
		loop, {
			sleep, 1000	;checks for completion once every second
			readerCount := 0
			for k,v in xtObj["readerQueue"]
				readerCount += v.count()
			writerCount := 0
			for k,v in xtObj["writerQueue"]
				writerCount += v.count()
			workerCount := 0
			for k,v in xtObj["workerQueue"]
				workerCount += v.count()
			
			pendingJobs["1: worker"] := workerCount
			pendingJobs["2: reader"] := readerCount
			pendingJobs["3: writer"] := writerCount
			
			ToolTip, % st_printarr(pendingJobs)
		}  until ((pendingJobs["1: worker"] + pendingJobs["2: reader"] + pendingJobs["3: writer"]) = 0)
	}
}
Sample child thread (some code differences between thread types but they all followed this basic outline)

Code: Select all

xtObj := CriticalObject(a_args[1]) ; get CriticalObject from pointer
CriSec := a_args[2] ; get CriticalSection
debug := a_args[3]	; duh
readerId := a_args[4]	; duh
threadType := "reader"
sleep,1000
takenJobs := []
tempIds := []

loop{
	jobClaimed := 0
	jobIndex := ""
	job := ""
	for k,v in xtObj["readerQueue",readerId]{
		;msgbox % st_printArr(v)
		if (v["jobStatus"] != "pending reader")
			Continue 
		jobIndex := k
		job := v
		jobClaimed := 1
		job["jobStatus"] := "claimed by reader"
		break	;found a job

	}
	if (jobClaimed = 0){
		sleep,100
		Continue
	}

	job["queueForWriter"] := processJobFile(trim(job["output"],chr(34)), target)
	job["jobStatus"] := "pending writer"
	xtObj["readerQueue",readerId].delete(jobIndex)
	if (xtObj["readerQueue",readerId].count() = 0){
		ExitApp
	}
}
I'm not recreating this particular project in v2 so any help you can give doesn't need to exactly spit out the above variables. What I want to do is take this same concept of a shared object into v2. I just can't figure it out.

Any and all pointers would be welcome. Thanks!

User avatar
thqby
Posts: 408
Joined: 16 Apr 2021, 11:18
Contact:

Re: (v2) How do I share an object between child threads?

Post by thqby » 12 Feb 2024, 06:32

Objects that are shared through CriticalObject must be released before the thread exits; otherwise, exceptions or memory leaks will be caused when criticalobjects is released.
This is one of the reasons why I deleted it.

Post Reply

Return to “Ask for Help”