child_process, asynchronous reading child process's stdout/stderr

Post your working scripts, libraries and tools.
User avatar
thqby
Posts: 433
Joined: 16 Apr 2021, 11:18
Contact:

child_process, asynchronous reading child process's stdout/stderr

06 Mar 2022, 01:36

Create a child process, and read stdout/stderr asynchronously, supporting multiple stdin inputs.

This is an asynchronous read mode implemented through the I/O Completion Ports and applies to all handles of files opened for overlapping I/O completion. For example, the handle is returned by the CreateFile function with the FILE_FLAG_OVERLAPPED flag.

Code: Select all

/************************************************************************
 * @description Create a child process, and read stdout/stderr
 * asynchronously, supporting multiple stdin inputs.
 * @author thqby
 * @date 2024/05/24
 * @version 2.0.0
 ***********************************************************************/

class child_process {
	/** @type {Integer} */
	pid := 0
	/** @type {Integer} */
	hProcess := 0
	/** @type {File} */
	stdin := 0
	/** @type {child_process.AsyncPipeReader} */
	stdout := 0, stderr := 0

	/**
	 * create a child process, then capture the stdout/stderr outputs.
	 * @param {String} command The name of the module to be executed or the command line to be executed.
	 * @param {Array<String>} [args] List of string arguments.
	 * @param {Object} [options] The object or map with optional property.
	 * @param {String} [options.cwd] Current working directory of the child process.
	 * @param {String} [options.input] The value is passed through stdin to the child process, and stdin is then closed.
	 * @param {String|Array<String>} [options.encoding='cp0'] The encoding(s) of stdin/stdout/stderr.
	 * @param {Integer} [options.hide=true] Hide the subprocess window that would normally be created on Windows systems.
	 * @param {Integer} [options.flags] The defval equal to `DllCall('GetPriorityClass', 'ptr', -1, 'uint')`,
	 * the flags that control the priority class and the creation of the process.
	 * 
	 * @example <caption>Wait for the subprocess to exit and read stdout.</caption>
	 * ping := child_process('ping -n 1 autohotkey.com')
	 * ping.Wait()
	 * MsgBox(ping.stdout.Read())
	 * 
	 * @example <caption>Read and write stdout/stdin many times</caption>
	 * cmd := child_process('cmd.exe')
	 * stdin := cmd.stdin, stdout := cmd.stdout, stdout.complete := false
	 * stdout.onData := (this, str) => RegExMatch(str, '(^|`n)(\s*\w:[^>]*>)$', &m) ?
	 * 	this.Append(SubStr(str, this.complete := 1, -m.Len[2])) : this.Append(str)
	 * write_and_read(cmd := '') {
	 * 	(cmd && stdin.Write(cmd), stdin.Read(0))
	 * 	while !stdout.complete
	 * 		Sleep(10)
	 * 	MsgBox(stdout.Read()), stdout.complete := false
	 * }
	 * write_and_read(), write_and_read('dir c:\`n')
	 * write_and_read('ping -n 1 autohotkey.com`n')
	 * cmd.Terminate()
	 */
	__New(command, args?, options?) {
		hide := true, flags := DllCall('GetPriorityClass', 'ptr', -1, 'uint')
		encoding := encoding_in := encoding_out := encoding_err := 'cp0'
		input := unset, cwd := params := ''
		if IsSet(options)
			for k, v in options.OwnProps()
				%k% := v
		flags |= hide ? 0x08000000 : 0
		if encoding is Array {
			for i, v in ['in', 'out', 'err']
				encoding.Has(i) ? encoding_%v% := encoding[i] : 0
		} else encoding_in := encoding_out := encoding_err := encoding
		if IsSet(args) {
			if args is Array {
				for v in args
					params .= ' ' escapeparam(v)
			} else params := args
		} else if SubStr(command, 1, 1) = '"' || !FileExist(command)
			params := command, command := ''

		if !DllCall('CreatePipe', "ptr*", &stdinR := 0, "ptr*", &stdinW := 0, 'ptr', 0, 'uint', 0)
			Throw OSError()
		(handles := [stdinR]).__Delete := closehandles
		this.stdin := FileOpen(stdinW, 'h', encoding_in)
		static mFlags_offset := (VerCompare(A_AhkVersion, '2.1-alpha.3') >= 0 ? 6 : 4) * A_PtrSize + 8, USEHANDLE := 0x10000000
		; remove USEHANDLE flag, auto close handle
		NumPut('uint', NumGet(p := ObjPtr(this.stdin), mFlags_offset, 'uint') & ~USEHANDLE, p, mFlags_offset)
		this.stdout := child_process.AsyncPipeReader('stdout', this, encoding_out)
		this.stderr := child_process.AsyncPipeReader('stderr', this, encoding_err)

		static x64 := A_PtrSize = 8
		STARTUPINFO := Buffer(sz := x64 ? 104 : 68, 0)
		PROCESS_INFORMATION := Buffer(x64 ? 24 : 16, 0)
		NumPut('uint', sz, STARTUPINFO), NumPut('uint', 0x100, STARTUPINFO, x64 ? 60 : 44)
		NumPut('ptr', stdinR, 'ptr', stdoutW := this.stdout.DeleteProp('hPipeW'), 'ptr',
			stderrW := this.stderr.DeleteProp('hPipeW'), STARTUPINFO, sz - A_PtrSize * 3)
		handles.Push(stdoutW, stderrW)
		for h in handles
			DllCall('SetHandleInformation', 'ptr', h, 'int', 1, 'int', 1)

		if !DllCall('CreateProcess', 'ptr', command ? StrPtr(command) : 0, 'ptr', params ? StrPtr(params) : 0, 'ptr', 0, 'int', 0,
			'int', true, 'int', flags, 'int', 0, 'ptr', cwd ? StrPtr(cwd) : 0, 'ptr', STARTUPINFO, 'ptr', PROCESS_INFORMATION)
			Throw OSError()
		handles.Push(NumGet(PROCESS_INFORMATION, A_PtrSize, 'ptr')), handles := 0
		this.hProcess := NumGet(PROCESS_INFORMATION, 'ptr'), this.pid := NumGet(PROCESS_INFORMATION, 2 * A_PtrSize, 'uint')

		if IsSet(input)
			this.stdin.Write(input), this.stdin.Read(0), this.stdin := unset

		closehandles(handles) {
			for h in handles
				DllCall('CloseHandle', 'ptr', h)
		}
		escapeparam(s) {
			s := StrReplace(s, '"', '\"', , &c)
			return c || RegExMatch(s, '\s') ? '"' s '"' : s
		}
	}
	__Delete() => this.hProcess && (DllCall('CloseHandle', 'ptr', this.hProcess), this.hProcess := 0)
	/**
	 * wait process exit
	 * @returns 0 (false) if the function timed out 
	 */
	Wait(timeout := -1) {
		hProcess := this.hProcess, t := A_TickCount, r := 258
		while timeout && 1 == r := DllCall('MsgWaitForMultipleObjects', 'uint', 1, 'ptr*', hProcess, 'int', 0, 'uint', timeout, 'uint', 7423, 'uint')
			(timeout == -1) || timeout := Max(timeout - A_TickCount + t, 0), Sleep(-1)
		if r == 0xffffffff
			Throw OSError()
		return r == 258 || !timeout ? 0 : 1
	}
	; terminate process
	Terminate() => this.hProcess && DllCall('TerminateProcess', 'ptr', this.hProcess)
	ExitCode => (DllCall('GetExitCodeProcess', 'ptr', this.hProcess, 'uint*', &code := 0), code)

	class AsyncPipeReader {
		/** @event onData */
		static Prototype.onData := (this, data) => this.Append(data)
		/** @event onClose */
		static Prototype.onClose := (this) => 0
		static Prototype.data := ''
		__New(name, process, codepage := 0) {
			if -1 == this.Ptr := DllCall('CreateNamedPipe', 'str', pn := '\\.\pipe\' name ObjPtr(this), 'uint', 0x40000001,
				'uint', 0, 'uint', 1, 'uint', 0, 'uint', 0, 'uint', 0, 'ptr', 0, 'ptr')
				Throw OSError()
			OVERLAPPED.EnableIoCompletionCallback(this)
			root := ObjPtr(this), process := ObjPtr(process), ol := OVERLAPPED(onConnect)
			err := !DllCall('ConnectNamedPipe', 'ptr', this, 'ptr', this._overlapped := ol) && A_LastError
			if err && err != 997
				Throw OSError()
			this.name := name
			this.hPipeW := DllCall('CreateFile', 'str', pn, 'uint', 0x40000000,
				'uint', 0, 'ptr', 0, 'uint', 0x3, 'uint', 0x40000080, 'ptr', 0, 'ptr')
			this.DefineProp('process', { get: (*) => ObjFromPtrAddRef(process) })

			onConnect(ol, err, *) {
				static bufsize := 16 * 1024
				local apr := ObjFromPtrAddRef(root)
				if !err {
					(buf := Buffer(16 + bufsize)).used := 0, ol.Call := onRead
					switch codepage, false {
						case -1:
							emit := emit_buf
							apr.DefineProp('Append', { call: append_buf })
								.DefineProp('Read', { call: this => (v := this.data, this.data := Buffer(), v) })
								.data := Buffer()
						case 1600, 'utf-16': emit := emit_str.Bind(utf16Reader)
						case 'utf-8': emit := emit_str.Bind(MultiByteReader(65001))
						default:
							if SubStr(codepage, 1, 2) = 'cp'
								codepage := Integer(SubStr(codepage, 3))
							emit := emit_str.Bind(MultiByteReader(codepage || DllCall('GetACP', 'uint')))
					}
					err := !DllCall('ReadFile', 'ptr', apr, 'ptr', buf,
						'uint', bufsize, 'ptr', 0, 'ptr', ol) && A_LastError
				}
				switch err {
					case 997, 0xC0000120, 0:	; ERROR_IO_PENDING, STATUS_CANCELLED
					case 109, 0xC000014B:		; ERROR_BROKEN_PIPE, STATUS_PIPE_BROKEN
						emit_close(apr)
					default: Throw OSError(err)
				}
				onRead(ol, err, byte) {
					local apr := ObjFromPtrAddRef(root)
					if !err {
						emit(apr, buf, byte + buf.used)
						err := !DllCall('ReadFile', 'ptr', apr, 'ptr', buf.Ptr + buf.used,
							'uint', bufsize, 'ptr', 0, 'ptr', ol) && A_LastError
					}
					switch err {
						case 997, 0xC0000120, 0:
						case 109, 0xC000014B:
							emit_close(apr)
						default: Throw OSError(err)
					}
				}
			}
			static emit_close(apr) => (apr.onClose(), apr.DeleteProp('onClose'), apr.DeleteProp('onData'), apr.__Delete())
			static emit_buf(apr, buf, byte) => apr.onData({ Ptr: buf.Ptr, Size: byte })
			static emit_str(reader, apr, buf, byte) => apr.onData(reader(buf, byte + buf.used))
			static append_buf(apr, buf) {
				data := apr.data, used := data.Size, data.Size += sz := buf.Size
				DllCall('RtlMoveMemory', 'ptr', buf, 'ptr', data.Ptr + used, 'uptr', sz)
			}
			static utf16Reader(buf, size) {
				str := StrGet(buf, size & 0xfffffffe, 'utf-16')
				(buf.used := size & 1) && NumPut('char', NumGet(buf, size - 1, 'char'), buf)
				return str
			}
			static MultiByteReader(codepage) {
				if !DllCall('GetCPInfo', 'uint', codepage, 'ptr', info := Buffer(18))
					Throw OSError()
				MaxCharSize := NumGet(info, 'uint')
				return reader
				reader(buf, size) {
					lpbyte := buf.Ptr
					if !l := DllCall('MultiByteToWideChar', 'uint', codepage, 'uint', 0, 'ptr', lpbyte, 'int', size, 'ptr', 0, 'int', 0, 'int')
						return ''
					VarSetStrCapacity(&str, l)
					DllCall('MultiByteToWideChar', 'uint', codepage, 'uint', 0, 'ptr', lpbyte, 'int', size, 'ptr', p := StrPtr(str), 'int', l, 'int')
					if (NumGet(p, (--l) <<= 1, 'ushort') == 0xfffd &&
						!DllCall('MultiByteToWideChar', 'uint', codepage, 'uint', 8, 'ptr', lpbyte, 'int', size, 'ptr', 0, 'int', 0, 'int') &&
						size - MaxCharSize < (n := DllCall('WideCharToMultiByte', 'uint', codepage, 'uint', 0, 'ptr', p, 'int', l >> 1, 'ptr', 0, 'int', 0, 'ptr', 0, 'ptr', 0)) &&
						buf.used := size - n)
						NumPut('int64', NumGet(lpbyte, n, 'int64'), lpbyte)
					else buf.used := 0, l += 2
					NumPut('short', 0, p + l)
					VarSetStrCapacity(&str, -1)
					return str
				}
			}
		}
		__Delete() {
			if this.Ptr == -1
				return
			r := DllCall('CancelIoEx', 'ptr', this, 'ptr', 0)
			DllCall('CloseHandle', 'ptr', this), r && Sleep(200)
			this.Ptr := -1
		}

		isClosed => this.Ptr == -1

		/** @type {child_process} */
		process => 0

		/**
		 * Read the cached data and clear the cache.
		 * @returns {Buffer|String}
		 */
		Read() => this.DeleteProp('data')

		; Default behavior when the onData callback function is not set, append data to the cache.
		Append(data) {
			this.data .= data
		}
	}
}
#Include <OVERLAPPED>

Code: Select all

/************************************************************************
 * @description An OVERLAPPED struct implemented with io completion ports
 * for asynchronously overlapping IO. It can be used to asynchronously read
 * and write files, pipes, http, and sockets.
 * @author thqby
 * @date 2024/05/24
 * @version 1.0.0
 ***********************************************************************/

class OVERLAPPED extends Buffer {
	/**
	 * The struct used in asynchronous (or overlapped) input and output (I/O).
	 * The specified callback function is called when the asynchronous operation completes or fails.
	 */
	static Call(cb := (this, err, byte) => 0) {
		static size := 4 * A_PtrSize + 8
		hEvent := DllCall('CreateEvent', 'ptr', 0, 'int', 1, 'int', 0, 'ptr', 0, 'ptr')
		NumPut('ptr', hEvent, 'ptr', ObjPtr(obj := super(size, 0)), obj, size - 2 * A_PtrSize)
		obj.Call := cb
		return obj
	}
	static EnableIoCompletionCallback(hFile) {
		static code := init()
		if !DllCall('BindIoCompletionCallback', 'ptr', hFile, 'ptr', code, 'uint', 0)
			Throw OSError()
		init() {
			static g := Gui(), offset := 3 * A_PtrSize + 8
			DllCall('SetParent', 'ptr', hwnd := g.Hwnd, "ptr", -3)
			msg := DllCall('RegisterWindowMessage', 'str', 'AHK_Overlapped_IO_Completion', 'uint')
			pSend := DllCall('GetProcAddress', 'ptr', DllCall('GetModuleHandle', 'str', 'user32', 'ptr'), 'astr', 'SendMessageW', 'ptr')
			OnMessage(msg, (overlapped, err, *) => ObjFromPtrAddRef(NumGet(overlapped, offset, 'ptr'))(err, NumGet(overlapped, A_PtrSize, 'uptr')) || 1, 255)
			; 0xnnnnnnnn is used as a placeholder for the compiler to generate corresponding instructions
			/*
			#include <windows.h>
			void CALLBACK OverlappedIOCompletion(DWORD err, DWORD bytes, LPOVERLAPPED overlapped) {
				((decltype(&SendMessageW))0x1111111111111111)((HWND)0x2222222222222222, (UINT)0x33333333, (WPARAM)overlapped, (LPARAM)err);
			}*/
			if A_PtrSize = 8 {
				NumPut(
					; 44 8b c9   mov r9d, ecx ; err
					; ba 00 00 00 00 mov edx, 0 ; msg
					'uint', 0xbac98b44, 'uint', msg,
					; 48 b9 00 00 00 00
					;  00 00 00 00 mov rcx, 0 ; hwnd
					'ushort', 0xb948, 'ptr', hwnd,
					; 48 b8 00 00 00 00
					;  00 00 00 00 mov rax, 0 ; SendMessageW
					'ushort', 0xb848, 'ptr', pSend,
					; 48 ff e0   rex_jmp rax
					'uint', 0xe0ff48, code := Buffer(32))
			} else {
				NumPut(
					; ff 74 24 04  push DWORD PTR _err$[esp-4]
					'uint', 0x042474ff,
					; ff 74 24 10  push DWORD PTR _overlapped$[esp]
					'uint', 0x102474ff,
					; 68 00 00 00 00 push 0     ; msg
					'uchar', 0x68, 'uint', msg,
					; 68 00 00 00 00 push 0     ; hwnd
					'uchar', 0x68, 'ptr', hwnd,
					; b8 00 00 00 00 mov eax, 0    ; SendMessageW
					'uchar', 0xb8, 'ptr', pSend,
					; ff d0   call eax
					; c2 0c 00  ret 12
					'int64', 0x0cc2d0ff, code := Buffer(32))
			}
			DllCall('VirtualProtect', 'ptr', code, 'ptr', 32, 'uint', 0x40, 'uint*', 0)
			return code
		}
	}
	Cancel(hFile) {
		if !DllCall('CancelIoEx', 'ptr', hFile, 'ptr', this)
			Throw OSError()
	}
	Clear() => DllCall('RtlZeroMemory', 'ptr', this, 'uptr', 2 * A_PtrSize + 8)
	Reset() => (ev := NumGet(this, 2 * A_PtrSize + 8, 'ptr')) && DllCall('ResetEvent', 'ptr', ev)
	__Delete() => (ev := NumGet(this, 2 * A_PtrSize + 8, 'ptr')) && DllCall('CloseHandle', 'ptr', ev)
}
examples

Code: Select all

MsgBox 'run ahk script and read stdout'
ahk := child_process(A_AhkPath, ['*'], { input: 'FileOpen("*", "w").Write("a string from child process.")' })
ahk.Wait()
MsgBox ahk.stdout.Read()
ahk := unset

MsgBox 'run cmd.exe and call some commands'
cmd := child_process('cmd.exe')
stdin := cmd.stdin, stdout := cmd.stdout, stdout.complete := false
stdout.onData := (this, str) => RegExMatch(str, '(^|`n)(\s*\w:[^>]*>)$', &m) ?
	this.Append(SubStr(str, this.complete := 1, -m.Len[2])) : this.Append(str)
write_and_read(cmd := '') {
	(cmd && stdin.Write(cmd), stdin.Read(0))
	while !stdout.complete
		Sleep(10)
	s := stdout.Read(), stdout.complete := false
	return s
}

write_and_read()
MsgBox 'list "c:\" files and folders'
MsgBox write_and_read('dir c:\`n')
MsgBox 'ping autohotkey.com'
MsgBox write_and_read('ping -n 1 autohotkey.com`n')
MsgBox 'read stderr'
write_and_read('dir /zzz`n')
MsgBox 'error: ' cmd.stderr.Read()
MsgBox 'terminate process'
stdin := stdout := 0
cmd.Terminate()
cmd := unset
Last edited by thqby on 24 May 2024, 23:40, edited 4 times in total.
neogna2
Posts: 600
Joined: 15 Sep 2016, 15:44

Re: child_process, capture child process's stdout/stderr outputs

06 Mar 2022, 05:49

edit2: thank you for this thqby.
In the example script I had to set the codepage option to match my system (cp850, western europe) to get all output characters to display correctly
cmd := child_process('cmd.exe', , , { cwd: A_Desktop, encoding: 'cp850'})
User avatar
thqby
Posts: 433
Joined: 16 Apr 2021, 11:18
Contact:

Re: child_process, capture child process's stdout/stderr outputs

06 Mar 2022, 09:48

Does cmd.exe not use the system default character set?
or FileOpen(file, 'r', 'cp0').Encoding != 'cp850'?
neogna2
Posts: 600
Joined: 15 Sep 2016, 15:44

Re: child_process, capture child process's stdout/stderr outputs

06 Mar 2022, 11:09

thqby wrote:
06 Mar 2022, 09:48
Does cmd.exe not use the system default character set?
or FileOpen(file, 'r', 'cp0').Encoding != 'cp850'?
I 'm easily confused by codepage/encoding stuff so expect errors in anything I say about it but I think the incorrect characters I saw in the example script's MsgBox output of text from cmd (before adding encoding: 'cp850' to the options) is related to the two different code pages in Windows and the specifics of my PC's Windows 10 install.
https://en.wikipedia.org/wiki/Windows_code_page
https://docs.microsoft.com/en-us/windows/win32/api/winnls/nf-winnls-getoemcp
https://docs.microsoft.com/en-us/windows/win32/api/winnls/nf-winnls-getacp
Here are some test lines with my PC's results in comments

Code: Select all

TestFile := "C:\ahk\test.txt"       ;earlier saved as UTF-8 (no BOM)
f := FileOpen(TestFile, 'r', 'cp0')
ff := FileOpen(TestFile, 'r')
MsgBox f.Encoding                   ;"CP1252"
MsgBox ff.Encoding                  ;"CP1252"
f.Close(), ff.Close()
MsgBox DllCall("GetOEMCP", "UInt")  ;"850" --> cp850
MsgBox DllCall("GetACP", "UInt")    ;"1252" --> cp1252
ExitApp
Saiapatsu
Posts: 17
Joined: 11 Jul 2019, 15:02

Re: child_process, capture child process's stdout/stderr outputs

06 Mar 2022, 22:22

Thank you so much for this module, the async/functional behavior is awesome.
I think this is what will motivate me to finally port my scripts from 2a to 2b.
(I have not tried this script yet, but will after I do that)

However, I'd like to point out that escapeparam() is a little bit naive.
Right now, it merely escapes quotes and surrounds the string in quotes if it has a space in it. But a trailing backslash will escape the trailing quote!

Escape all quotes and double their leading backslashes.
Regex replace
(\\*)"
with
$1$1\\"

If the string contains a space or tab or such, surround it in quotes and double all trailing backslashes (i.e., double the trailing quote's leading backslashes).
Regex replace
\\*$
with
$0$0"
and prepend a quote.

Furthermore, if the command line goes through cmd (i.e. isn't for CreateProcess), escape all cmd special characters with a caret.
Regex replace
[()<>^&|%!"\n]
with
^$0
You don't use Run(), so you don't need this, it's here for completion's sake.

Also, some programs (such as echo) don't split the command line using CommandLineToArgvW or similar.
Fortunately, passing a string instead of an array as args avoids escaping params.

See also: https://docs.microsoft.com/en-us/archive/blogs/twistylittlepassagesallalike/everyone-quotes-command-line-arguments-the-wrong-way
User avatar
thqby
Posts: 433
Joined: 16 Apr 2021, 11:18
Contact:

Re: child_process, capture child process's stdout/stderr outputs

07 Mar 2022, 04:23

Saiapatsu wrote:
06 Mar 2022, 22:22
Escape all quotes and double their leading backslashes.
Regex replace
(\\*)"
with
$1$1\\"

If the string contains a space or tab or such, surround it in quotes and double all trailing backslashes (i.e., double the trailing quote's leading backslashes).
Regex replace
\\*$
with
$0$0"
and prepend a quote.
I don't know much about command line parameters. Thank you for your advice.

After modification

Code: Select all

escapeparam(s) {
	s := StrReplace(s, '"', '\"', , &c)
	return c || RegExMatch(s, '[\s\v]') ? '"' RegexReplace(s, '(\\*)(?=(\\"|$))', '$1$1') '"' : s
}
hasantr
Posts: 933
Joined: 05 Apr 2016, 14:18
Location: İstanbul

Re: child_process, capture child process's stdout/stderr outputs

22 Apr 2022, 02:29

Does it allow to use multiple processor cores? Or does it try to occur between the processes of the main process.
User avatar
thqby
Posts: 433
Joined: 16 Apr 2021, 11:18
Contact:

Re: child_process, capture child process's stdout/stderr outputs

24 May 2024, 23:37

Asynchronous reading is re-realized in a more efficient and energy-saving way, which breaks some of the last version of the code.
The callback function is now set in stdout/stderr, and it is not split line by line after reading the output every time.
The mode of synchronous reading has changed, as shown in the example.

Return to “Scripts and Functions (v2)”

Who is online

Users browsing this forum: Gewerd_Strauss, Lpanatt, shipaddicted and 42 guests