Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post your working scripts, libraries and tools.
teadrinker
Posts: 4602
Joined: 29 Mar 2015, 09:41
Contact:

Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by teadrinker » 30 Oct 2023, 20:33

AsyncStdoutReader class for asynchronous non-blocking reading of the stdout stream

The class is used to create a console application process without a console window and read its stdout stream. The stdout stream is read in asynchronous non-blocking mode.
The process corresponding to the command line passed to the class constructor is created when an instance of the class is created.

The code was developed using RegisterWaitCallback.ahk by @lexikos and some ideas prompted by @Helgef and @swagfag, thanks to them!

Code: Select all

class AsyncStdoutReader
{
/*
cmd:      the command line of the process to be created. The stdout stream of this process will be read asynchronously.
callback: a function that will be called when a new portion of data is written to stdout.
          The function accepts three parameters:
              PID — ID of the process whose stdout is passed
              str — next data chunk from stdout
              state — current state of writing data to stdout, can be 0 (incompleted), 1 (completed) and -1 (timed out)
timeout:  maximum time in milliseconds to wait for a new data portion in stdout
encoding: sometimes it is necessary to specify the encoding in which to read text from stdout, in most cases this parameter can be omitted.
*/
    __New(cmd, callback?, timeout?, encoding?) {
        this.event := AsyncStdoutReader.Event()
        this.params := {
            encoding: encoding ?? 'cp' . DllCall('GetOEMCP'),
            overlapped: Buffer(A_PtrSize * 3 + 8, 0),
            callback: callback ?? unset,
            hEvent: this.event.handle,
            timeout: timeout ?? unset,
            startTime: A_TickCount,
            buf: Buffer(4096, 0),
            complete: false,
            outData: ''
        }
        this.process := AsyncStdoutReader.Process(cmd, this.params)
        this.signal := AsyncStdoutReader.EventSignal(this.process, this.params)
        this.params.processID := this.processID
        this.process.Read()
    }

    processID => this.process.PID
    complete => this.params.complete
    outData => this.params.outData

    __Delete() {
        DllCall('CancelIoEx', 'Ptr', this.process.hPipeRead, 'Ptr', this.params.overlapped)
        this.event.Set()
        Sleep 50
        this.signal.Clear()
        this.process.Clear()
        this.params.buf.Size := 0
        this.params.outData := ''
        ProcessClose(this.processID)
    }

    class Event
    {
        __New() => this.handle := DllCall('CreateEvent', 'Int', 0, 'Int', 0, 'Int', 0, 'Int', 0, 'Ptr')
        __Delete() => DllCall('CloseHandle', 'Ptr', this.handle)
        Set() => DllCall('SetEvent', 'Ptr', this.handle)
    }

    class Process
    {
        __New(cmd, info) {
            this.info := info
            this.CreatePipes()
            if !this.PID := this.CreateProcess(cmd) {
                throw OSError('Failed to create process')
            }
        }

        CreatePipes() {
            static FILE_FLAG_OVERLAPPED := 0x40000000, PIPE_ACCESS_INBOUND := 0x1
                 , pipeMode := (PIPE_TYPE_BYTE := 0) | (PIPE_WAIT := 0)
                 , GENERIC_WRITE := 0x40000000, OPEN_EXISTING := 0x3
                 , FILE_ATTRIBUTE_NORMAL := 0x80, HANDLE_FLAG_INHERIT := 0x1

            this.hPipeRead := DllCall('CreateNamedPipe', 'Str', pipeName := '\\.\pipe\StdOut_' . A_TickCount,
                'UInt', PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, 'UInt', pipeMode, 'UInt', 1,
                'UInt', this.info.buf.Size, 'UInt', this.info.buf.Size, 'UInt', 120000, 'Ptr', 0, 'Ptr')

            this.hPipeWrite := DllCall('CreateFile', 'Str', pipeName, 'UInt', GENERIC_WRITE, 'UInt', 0, 'Ptr', 0,
                'UInt', OPEN_EXISTING, 'UInt', FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 'Ptr', 0, 'Ptr')
            DllCall('SetHandleInformation', 'Ptr', this.hPipeWrite, 'UInt', HANDLE_FLAG_INHERIT, 'UInt', HANDLE_FLAG_INHERIT)
        }

        CreateProcess(cmd) {
            static STARTF_USESTDHANDLES := 0x100, CREATE_NO_WINDOW := 0x8000000
            STARTUPINFO := Buffer(siSize := A_PtrSize * 9 + 32, 0)
            NumPut('UInt', siSize, STARTUPINFO)
            NumPut('UInt', STARTF_USESTDHANDLES, STARTUPINFO, A_PtrSize * 4 + 28)
            NumPut('Ptr', this.hPipeWrite, 'Ptr', this.hPipeWrite, STARTUPINFO, siSize - A_PtrSize * 2)

            PROCESS_INFORMATION := Buffer(A_PtrSize * 2 + 8, 0)
            if !DllCall('CreateProcess', 'Ptr', 0, 'Str', cmd, 'Ptr', 0, 'Ptr', 0, 'UInt', true,
                'UInt', CREATE_NO_WINDOW, 'Ptr', 0, 'Ptr', 0, 'Ptr', STARTUPINFO, 'Ptr', PROCESS_INFORMATION)
                return this.Clear()
            
            DllCall('CloseHandle', 'Ptr', this.hPipeWrite), this.hPipeWrite := 0
            return PID := NumGet(PROCESS_INFORMATION, A_PtrSize * 2, 'UInt')
        }

        Read() {
            buf := this.info.buf, overlapped := this.info.overlapped
            overlapped.__New(overlapped.Size, 0)
            NumPut('Ptr', this.info.hEvent, overlapped, A_PtrSize * 2 + 8)
            res := DllCall('ReadFile', 'Ptr', this.hPipeRead, 'Ptr', buf, 'UInt', buf.Size, 'UIntP', &size := 0, 'Ptr', overlapped)
            if res {
                this.info.startTime := A_TickCount
                this.info.outData .= str := StrGet(buf, size, this.info.encoding)
                (this.info.HasProp('callback') && SetTimer(this.info.callback.Bind(this.PID, str, 0), -10))
                this.Read()
            }
            else if !res && A_LastError != ERROR_IO_PENDING := 997 {
                this.info.complete := true
                (this.info.HasProp('callback') && SetTimer(this.info.callback.Bind(this.PID, '', 1), -10))
            }
        }

        Clear() {
            DllCall('CloseHandle', 'Ptr', this.hPipeRead)
            (this.hPipeWrite && DllCall('CloseHandle', 'Ptr', this.hPipeWrite))
        }
    }

    class EventSignal
    {
        __New(stdOut, info) {
            this.info := info
            this.stdOut := stdOut
            this.onEvent := ObjBindMethod(this, 'Signal')
            timeout := info.HasProp('timeout') ? info.timeout : -1
            this.regWait := this.RegisterWaitCallback(this.info.hEvent, this.onEvent, timeout)
        }

        Signal(handle, timedOut) {
            if timedOut {
                (this.info.HasProp('callback') && SetTimer(this.info.callback.Bind(this.info.processID, '', -1), -10))
                return
            }
            if !DllCall('GetOverlappedResult', 'Ptr', handle, 'Ptr', this.info.overlapped, 'UIntP', &size := 0, 'UInt', false) {
                (this.info.HasProp('callback') && SetTimer(this.info.callback.Bind(this.info.processID, '', 1), -10))
                return this.info.complete := true
            }
            this.info.startTime := A_TickCount
            this.info.outData .= str := StrGet(this.info.buf, size, this.info.encoding)
            (this.info.HasProp('callback') && SetTimer(this.info.callback.Bind(this.info.processID, str, 0), -10))
            this.stdOut.Read()
            timeout := this.info.HasProp('timeout') ? this.info.timeout - A_TickCount + this.info.startTime : -1
            this.regWait := this.RegisterWaitCallback(this.info.hEvent, this.onEvent, timeout)
        }

        Clear() {
            this.regWait.Unregister()
            this.DeleteProp('regWait')
            this.DeleteProp('onEvent')
        }

        RegisterWaitCallback(handle, callback, timeout := -1) {
            ; by lexikos https://www.autohotkey.com/boards/viewtopic.php?t=110691
            static waitCallback, postMessageW, wnd, nmsg := 0x5743
            if !IsSet(waitCallback) {
                a := A_PtrSize = 8 ? 0x8BCAB60F44C18B48 : 0x448B50082444B60F
                b := A_PtrSize = 8 ? 0x498B48C18B4C1051 : 0x70FF0870FF500824
                c := A_PtrSize = 8 ? 0x0000000020FF4808 : 0x0008C2D0FF008B04
                NumPut('int64', a, 'int64', b, 'int64', c, waitCallback := Buffer(24))
                DllCall('VirtualProtect', 'ptr', waitCallback, 'ptr', 24, 'uint', 0x40, 'uint*', 0)
                hLib := DllCall('GetModuleHandle', 'str', 'user32', 'ptr')
                postMessageW := DllCall('GetProcAddress', 'ptr', hLib, 'astr', 'PostMessageW', 'ptr')
                wnd := Gui(), DllCall('SetParent', 'ptr', wnd.hwnd, 'ptr', -3)    ; HWND_MESSAGE = -3
                OnMessage(nmsg, messaged, 255)
            }
            NumPut('ptr', postMessageW, 'ptr', wnd.hwnd, 'uptr', nmsg, param := AsyncStdoutReader.EventSignal.RegisteredWait())
            NumPut('ptr', ObjPtr(param), param, A_PtrSize * 3)
            param.callback := callback, param.handle := handle
            if !DllCall('RegisterWaitForSingleObject', 'ptr*', &waitHandle := 0,
                'ptr', handle, 'ptr', waitCallback, 'ptr', param, 'uint', timeout, 'uint', 8)
                throw OSError()
            param.waitHandle := waitHandle, param.locked := ObjPtrAddRef(param)
            return param
            static messaged(wParam, lParam, nmsg, hwnd) {
                if hwnd = wnd.hwnd {
                    local param := ObjFromPtrAddRef(NumGet(wParam + A_PtrSize * 3, 'ptr'))
                    try (param.callback)(param.handle, lParam)
                    param._unlock()
                }
            }
        }

        class RegisteredWait extends Buffer
        {
            static prototype.waitHandle := 0, prototype.locked := 0
            __new() => super.__new(A_PtrSize * 5, 0)
            __delete() => this.Unregister()
            _unlock() {
                (p := this.locked) && (this.locked := 0, ObjRelease(p))
            }
            Unregister() {
                wh := this.waitHandle, this.waitHandle := 0
                (wh) && DllCall('UnregisterWaitEx', 'ptr', wh, 'ptr', -1)
                this._unlock()
            }
        }
    }
}
Usage examples:

Code: Select all

#Requires AutoHotkey v2
Persistent

ReadOutput()
reader := AsyncStdoutReader('ping -n 8 google.com', ReadOutput)

ReadOutput(PID := 0, str := '', state := 0) {
    global reader
    static EM_SETSEL := 0xB1, wnd := '', text := '', edit := ''
    if !wnd {
        wnd := Gui('+Resize', 'Async reading of stdout')
        wnd.MarginX := wnd.MarginY := 0
        wnd.SetFont('s12', 'Consolas')
        wnd.AddText('x10 y10', 'Complete: ')
        text := wnd.AddText('x+5 yp w100', 'false')
        edit := wnd.AddEdit('xm y+10 w650 h500')
        edit.GetPos(, &y)
        wnd.OnEvent('Size', (o, m, w, h) => edit.Move(,, w, h - y))
        wnd.OnEvent('Close', (*) => ExitApp())
        wnd.Show()
    }
    if !PID {
        text.Value := 'false'
        edit.Value := ''
        return
    }
    text.Value := state = -1 ? 'timed out' : state = 0 ? 'false' : 'true'
    SendMessage EM_SETSEL, -2, -1, edit
    EditPaste str, edit

    if reader && state {
        outData := reader.outData, reader := ''
        MsgBox outData, 'Complete stdout', 0x2040
    }
}

Code: Select all

#Requires AutoHotkey v2
Persistent

reader := [AsyncStdoutReader('cmd /c cd /?', (pid, str, state) => (
    ; when state is 1 (complete) or -1 (timed out), output full stdout
    state && (stdout := reader[1].outData, reader[1] := '', MsgBox(stdout, 'Console command CD help info'))
))]
Last edited by teadrinker on 04 Nov 2023, 13:52, edited 1 time in total.

neogna2
Posts: 635
Joined: 15 Sep 2016, 15:44

Re: Asynchronous non-blocking stdout reading

Post by neogna2 » 04 Nov 2023, 05:43

Thank you @teadrinker

I suggest renaming the thread title "class AsyncStdoutReader: Asynchronous non-blocking stdout reading" to make it easier to find through forum search.

For context some links to previous functions and discussions of issues with them that teadrinker's class attempts to overcome
SKAN's v1 RunCMD (and alpha/temporary v2 with an issue)
tranht17's RunTerminal (v2 port of v1 RunCMD with same issue)

edit: SKAN's new v2 RunCMD released 2024-10-14
Last edited by neogna2 on 16 Oct 2024, 16:46, edited 1 time in total.

teadrinker
Posts: 4602
Joined: 29 Mar 2015, 09:41
Contact:

Re: Asynchronous non-blocking stdout reading

Post by teadrinker » 04 Nov 2023, 13:50

neogna2 wrote: I suggest renaming the thread
Renamed.

squadjot
Posts: 40
Joined: 17 Nov 2024, 08:55

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by squadjot » 05 Dec 2024, 11:00

It works quite well, so far i can successfully run two ping commands simultaneously, only thing is, i needed to put Sleep 1 between each call.

Questions:
1. Is there a way to just get lines?
I made this to come around the problem

2. Does the AsyncStdoutReader read StdErr too? ( does not look like it does )
I know i can force the comspec to pipe errors to StdOut ( which would be necessary for apps like FFMPEG because it outputs progressinfo to StdErr )

Code: Select all

RunCmdAsync(obj) {
    ; global wv

    ; AsyncStdoutReader does not output per line, so i need linebuffer
    static linebuffers := Map() 

    ; Launch AsyncStdoutReader
    reader := AsyncStdoutReader(obj.command, ReadOutput)

    ; Create linebuffer based on PID
    linebuffers[reader.process.PID] := ""

    ReadOutput(PID := 0, str := "", state := 0) {
    
        ; Append the received output to the buffer
        tempstr := linebuffers[PID] . str

        ; Process the buffer line by line
        while pos := InStr(tempstr, "`r`n") {
            
            ; Extract a single line
            line := SubStr(tempstr, 1, pos - 1)
            
            ; Remove the processed line from the buffer
            tempstr := SubStr(tempstr, pos + 2)

            ; Send the completed line
           ; wv.PostWebMessageAsJSON(JSON.stringify({pid: PID, output: line, state: state}))
        }

        ; Update ascociated buffer
        linebuffers[PID] := tempstr

        ; The script still requires this to avoid issues
        if reader {

        }

        ; Delete linebuffer
        if( state ) {
            linebuffers.Delete(PID)
        }
    }
    return reader
}
Last edited by squadjot on 10 Dec 2024, 11:12, edited 1 time in total.

teadrinker
Posts: 4602
Joined: 29 Mar 2015, 09:41
Contact:

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by teadrinker » 06 Dec 2024, 09:45

squadjot wrote: Is there a way to just get lines?
I'm not quite sure what you mean, please clarify.
squadjot wrote: Does the AsyncStdoutReader read StdErr too?
Yes, in my code this.hPipeWrite is used for both stdout and stderr.

squadjot
Posts: 40
Joined: 17 Nov 2024, 08:55

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by squadjot » 09 Dec 2024, 12:31

Ok, thanks
teadrinker wrote:
06 Dec 2024, 09:45
I'm not quite sure what you mean, please clarify.
I wanted to know if your script could somehow only output lines ( complete lines )
If i'm not mistaken, your script outputs characters, based on bytes, right?

Anyways In WSH JScript reading lines would look something like this:

Code: Select all

...
while (!oExec.StdOut.AtEndOfStream) {
    	var line = oExec.StdOut.ReadLine();
	doSomethingWIthLine(line)
}
So, that's what i asked for ,..but i'm not sure i need you answer on that.
I'm kind of new with AHK, but my script, for parsing the lines, seems to work, so i'm all good and your script is fine as it is. :D

Again, thanks for AsyncStdoutReader, it will come in handy.

User avatar
thqby
Posts: 594
Joined: 16 Apr 2021, 11:18
Contact:

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by thqby » 09 Dec 2024, 22:27

@teadrinker

Code: Select all

#SingleInstance Off
if A_Args.Length {
std := FileOpen('*', 'w', 'utf-8')
buf := Buffer(4 * 1024 + 20, Ord('a'))
p := buf.Ptr
StrPut('☀☁', p + 4 * 1024 - 2, 6, 'utf-8')
std.RawWrite(buf)
std.Read(0)
ExitApp()
}
ReadOutput()
reader := AsyncStdoutReader('"' A_AhkPath '" "' A_ScriptFullPath '" 1', ReadOutput, , 'utf-8')
When the buffer cannot receive complete unicode characters, the expected result is not obtained.

teadrinker
Posts: 4602
Joined: 29 Mar 2015, 09:41
Contact:

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by teadrinker » 10 Dec 2024, 05:16

@squadjot
Reading the output stream line by line contradicts the logic of this approach, here it is asynchronous data reading, but ReadLine() is a blocking method.
@thqby
Yes, as I understand, multibyte characters are cut if they hit the buffer boundary. I think this drawback can be eliminated by analyzing the bytes at the end of the buffer. I'll try to fix that later, thanks for your attention.

squadjot
Posts: 40
Joined: 17 Nov 2024, 08:55

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by squadjot » 10 Dec 2024, 12:17

@teadrinker
I understand and appreciate that this approach is about asynchronous data reading, and I agree that a blocking method like ReadLine() isn't suitable here. My mention of it was more about referencing the concept and asking if there’s a built-in way to achieve something similar asynchronously. In most cases, it makes sense to process data line by line, especially since programs that output to streams are often designed to produce data in line-based chunks as well. It's no biggie, i can just build my own line parser.

As mentioned, I think your script is great as it is (a fix for multibyte characters would be awesome, though)

User avatar
thqby
Posts: 594
Joined: 16 Apr 2021, 11:18
Contact:

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by thqby » 10 Dec 2024, 20:04

https://github.com/thqby/ahk2_lib/blob/master/child_process.ahk#L196-L222

I'm currently using this solution, and I don't know if there is a better solution.

squadjot
Posts: 40
Joined: 17 Nov 2024, 08:55

Re: Class AsyncStdoutReader: asynchronous non-blocking stdout reading

Post by squadjot » 11 Dec 2024, 07:24

@thqby
Thanks!, i didn't know you had a script too, i will also try that.

Post Reply

Return to “Scripts and Functions (v2)”