How can completion routine in ThreadpoolIo function access output buffer of async operation

TLDR: Trying to model an inverted IOCTL flow where my application will pend two asynchronous request to driver. Each pended request can invoke completion handler concurrently and should not corrupt the data returned by driver.

Long post:
I’ve been experimenting with ThreadpoolIo functions and encountered a challenge in accessing the output buffer in the completion routine when an asynchronous operation is completed. Here’s what I’ve accomplished so far:

Created a dedicated thread with the responsibility to:
Open a handle to the console
Create a ThreadPoolIo object
Send two asynchronous requests (using ReadFile) to the device (in this case terminal/console)
Wait for user input, and upon completion, trigger the completion routine
Ensure that the buffer populated by ReadFile is visible in the completion routine.
To avoid potential race conditions and data corruption, I cannot use a single buffer for two separate threads in the ThreadPool. Introducing locks would serialize the completion routine. Therefore, I’m aiming for concurrent execution of the completion routine by employing two separate buffers to prevent race conditions.

My question is :
1. How to access the data populated by ReadFile function?
2. Is there any way to extract information about which async operation has completed(1st or 2nd in my example)

I know this is not a proper use of ThreadpoolIO, this is just an experiment to try out of threadpoolIo function. The actual device to which readfile will be issued can’t be shown in the example due to proprietary reason.

#define MAX_NUM_REQUEST (2)

VOID CALLBACK mycompletionRoutine(
    PTP_CALLBACK_INSTANCE pInstance, // See "Callback Termination Actions" section
    PVOID pvContext,
    PVOID pOverlapped,
    ULONG IoResult,
    ULONG_PTR NumberOfBytesTransferred,
    PTP_IO pIo)
{
    if (IoResult == NO_ERROR)
    {
        **// How can I access the buff[0] and buff[1] in which readfile stores the data?**
    }
    else
    {
        std::cout << "Completeion routine failure" << std::endl;
    }
}

DWORD WINAPI ReadThread(LPVOID lpParam)
{
    // Open handle to existing file in overlapped mode
    HANDLE hFile = CreateFile( L"CONIN$",                // This refers to console input
        GENERIC_READ,          // open for reading
        0,                      // do not share
        NULL,                   // default security
        OPEN_EXISTING,             // open existing file only
        FILE_FLAG_OVERLAPPED,  // overlapped/async operation
        NULL);                  // no attr. template

    if (hFile == INVALID_HANDLE_VALUE)
    {
        std::cout << "Invalid Handle value" << std::endl;
        return -1;
    }

    // Buffer in which content from console will be stored,maximum 1024 characters will be read. Two separate buffer for each read request
    char buff[MAX_NUM_REQUEST][1024] = { 0 };

    // Create a thread pool object and associate it with a file handle.
    PTP_IO thread_pool_obj = CreateThreadpoolIo(hFile, mycompletionRoutine, NULL, NULL);
    if (thread_pool_obj == NULL)
    {
        DWORD status = GetLastError();
        std::cout << "Thread pool obj is NULL" << status << std::endl;
        return -1;
    }
   
    // Always issue two asynchronous request at a time.
    while (1)
    {
        OVERLAPPED overlapped[MAX_NUM_REQUEST] = {};
        HANDLE overlappedEvent[MAX_NUM_REQUEST];
        for (int i = 0; i < MAX_NUM_REQUEST; i++)
        {
            // Start thread pool object, as per MSDN StartThreadpoolIo must be called before issuing read/write request
            StartThreadpoolIo(thread_pool_obj);

            // Issue a async read request

            overlappedEvent[i] = CreateEventW(NULL, FALSE, FALSE, NULL);
            overlapped[i].hEvent = overlappedEvent[i];
            bool ret = ReadFile(hFile, &buff[i], 100, NULL, &overlapped[i]);
            if (ret == 0)
            {
                DWORD status = GetLastError();
                if (status != ERROR_IO_PENDING)
                {
                    std::cout << "IO operation failed" << std::endl;
                    return -1;
                }
            }
        }
        Sleep(5000);

    }
    return 0;
}
int main()
{
    

    HANDLE ReadThreadHandle = CreateThread(NULL,0, ReadThread,NULL,0,NULL);

    if(ReadThreadHandle != NULL)
    {
        std::cout << "Thread creation is successfull" << std::endl;

        // Wait until thread has terminated.
        WaitForSingleObject(ReadThreadHandle, INFINITE);

        // Close thread handle upon completion.
        CloseHandle(ReadThreadHandle);

        std::cout << "Program ran successfully" << std::endl;
    }
    else
    {
        std::cout << "Thread creation failed" << std::endl;
    }
    
    return 0;
}

typedef struct _BUFFER {
   OVERLAPPED Overlapped;
   char Buffer[1024];
} BUFFER, *PBUFFER;

BUFFER b;

// init b fields : Overlapped, allocate handle, zero memory, etc

bool ret = ReadFile(hFile, &buff[i], 100, NULL, &b.Overlapped;

VOID CALLBACK mycompletionRoutine(
    PTP_CALLBACK_INSTANCE pInstance, // See "Callback Termination Actions" section
    PVOID pvContext,
    PVOID pOverlapped,
    ULONG IoResult,
    ULONG_PTR NumberOfBytesTransferred,
    PTP_IO pIo)
{
   PBUFFER pBuf = CONTAINING_RECORD(pOverlapped, BUFFER, Overlapped);

}
1 Like

Thankyou. I guess the second argument passed to ReadFile function is a typo, but overall idea makes sense.

A typo? It looks like Doron copied your code.

The point is that anytime you need to provide additional context about an overlapped IO operation, create a structure that contains an OVERLAPPED + your extra stuff. Then pass that to ReadFile, WriteFile, DeviceIoControl etc. and the OS does not know or care about your extra stuff, but when the completion happens, it is available for you to consume

1 Like

seriously, for copy paste sample code you are pointing out typos? How about I fix one of your bugs while I am it and specify the proper buffer size…

bool ret = ReadFile(hFile, b.Buffer, sizeof (b.Buffer), NULL, &b.Overlapped);
2 Likes

Thankyou Doron and Mbond2 for your explanation.

Follow up question:
How can I make sure that there are always two request always pended by my application?
Once the ReadFile asynchronous operation is issued by the thread(using for loop), it can wait for event using :

 WaitForMultipleObjects(
                        MAX_NUM_REQUEST,           // number of objects in array
                        overlappedEvent,     // array of objects
                        FALSE,       // wait for any object
                        INFINITE); 

However, the above code will wait for only one event and once that event is signaled, it will issue two new async operation. That makes a total of three async operation in pending state. Just wanted to know if there’s any better way to do it

WFMO will return when a request completes. You should then process the results and resubmit that ONE request. If each completion results in two new requests, what you have created is a denial-of-service attack. You’ll bring the system down as the requests increase exponentially.

Certainly, I concur with your statement. Nonetheless, there is a special scenario where WaitForMultipleObjects (WFMO) returns WAIT_OBJECT_0(in this case) even when all pending events are completed simultaneously. In such cases, how can we distinguish whether it’s specifically event[0] that has completed or if all the pending operations have completed?

From MSDN docs:

If bWaitAll is FALSE, the return value minus WAIT_OBJECT_0 indicates the lpHandles array index of the object that satisfied the wait. If more than one object became signaled during the call, this is the array index of the signaled object with the smallest index value of all the signaled objects

The following piece of code issues a new request when previous one is completed, however when multiple request are completed, I am suspecting that it might fail, since it will think only request[0] has been completed.

#define MAX_NUM_REQUEST (2)
typedef struct
{
    OVERLAPPED Overlapped;
    char       Buffer[1024];
    int        id;
} PER_IO_DATA;

DWORD WINAPI ReadThread(LPVOID lpParam)
{
    // Open handle to existing file in overlapped mode
    HANDLE hFile = CreateFile(L"C:\\test\\text.txt",                 // This refers to console input
        GENERIC_READ,          // open for reading
        0,                      // do not share
        NULL,                   // default security
        OPEN_EXISTING,             // open existing file only
        FILE_FLAG_OVERLAPPED,  // overlapped/async operation
        NULL);                  // no attr. template

    if (hFile == INVALID_HANDLE_VALUE)
    {
        std::cout << "Invalid Handle value" << std::endl;
        return -1;
    }
    else
    {
        std::cout << "CreateFile success" << std::endl;
    }

    // Create a thread pool object and associate it with a file handle.
    PTP_IO thread_pool_obj = CreateThreadpoolIo(hFile, mycompletionRoutine, NULL, NULL);
    if (thread_pool_obj == NULL)
    {
        DWORD status = GetLastError();
        std::cout << "Thread pool obj is NULL" << status << std::endl;
        return -1;
    }
    else
    {
        std::cout << "Thread pool obj created successfully" << std::endl;
    }
    
    //  Issue two asynchronous request when the thread starts, 
    //  The goal is to keep two request pending with the target driver at anytime
    // When one request is completed, pend a new request so that a total of two request will be pending with device.
    PER_IO_DATA IoDataPerThread[MAX_NUM_REQUEST] = {};
    HANDLE overlappedEvent[MAX_NUM_REQUEST];
    for (int i = 0; i < (MAX_NUM_REQUEST); i++)
    {
        // Issue a async read request
        overlappedEvent[i] = CreateEventW(NULL, FALSE, FALSE, NULL);
        IoDataPerThread[i].Overlapped.hEvent = overlappedEvent[i];
        IoDataPerThread[i].id = i;

        // Start thread pool object, as per MSDN StartThreadpoolIo must be called before issuing read/write request
        StartThreadpoolIo(thread_pool_obj);

        // Send async read file operation
        bool ret = ReadFile(hFile, (IoDataPerThread[i].Buffer), sizeof(IoDataPerThread[i].Buffer), NULL, &(IoDataPerThread[i].Overlapped));
        if (ret == 0)
        {
            DWORD status = GetLastError();
            if (status != ERROR_IO_PENDING)
            {
                std::cout << "IO operation failed" << std::endl;
                return -1;
            }
        }
    }

    while (1)
    {

        //Wait for multiple object will return when a request completes. 
        // We should then process the results and resubmit that ONE request so that
        // there are total two request pended with the driver/device.

        DWORD dwResult = WaitForMultipleObjects(
            MAX_NUM_REQUEST,           // number of objects in array
            overlappedEvent,     // array of objects
            FALSE,       // wait for any object
            INFINITE);       // infinite wait;

        // Determine which of the two request have completed
        if ((dwResult >= WAIT_OBJECT_0) && (dwResult <= (WAIT_OBJECT_0 + MAX_NUM_REQUEST - 1)))
        {
            IoDataPerThread[dwResult].Overlapped.hEvent = overlappedEvent[dwResult];
            IoDataPerThread[dwResult].id = dwResult;
            
            // Start thread pool object, as per MSDN StartThreadpoolIo must be called before issuing read/write request
            StartThreadpoolIo(thread_pool_obj);

            bool ret = ReadFile(hFile, (IoDataPerThread[dwResult].Buffer), 100, NULL, &(IoDataPerThread[dwResult].Overlapped));
            if (ret == 0)
            {
                DWORD status = GetLastError();
                if (status != ERROR_IO_PENDING)
                {
                    std::cout << "IO operation failed" << std::endl;
                    return -1;
                }
            }
        }
        else
        {
            std::cout << "WaitForSingleObjectEx failed with error code: %d\n" << GetLastError();
        }
    }

    return 0;
}

IoDataPerThread[dwResult].Overlapped.hEvent = overlappedEvent[dwResult];

No need to do this in the WFMO loop

I am suspecting that it might fail, since it will think only request[0] has been completed.

In the case where both requests have completed at the same time, WFMO will return the first index. On the next call to WFMO, it will return the second index. Or if the first complete immediately, the first index again. Eventually the WFMO will return the second index and the second request will be resubmitted. If you are truly worried that in this case the second request may not be sent due to starvation (of the first repeatedly completing immediately), resend the I/O in mycompletionRoutine.

This is a poor use of the thread pool. Generally, you don’t want to use event HANDLEs to track request completion, and you certainly don’t want to issue new IO from a loop like this. It just adds overhead - event signalization and context switches - and reduces throughput by limiting IO submission to a single thread. The ReadFile call should be made from the completion routine. It is less code to write, there is no confusion about which OVERLAPPED etc to use and performs better. It also avoids allocation for each new request, or a bad race condition / lock on buffer ownership of the buffer

It also makes the shutdown sequence simpler and can be easily expanded to pending more than MAX_WAIT_OBJECTS ReadFile calls concurrently with no code changes. That may sound like a lot of pending calls, but it really isn’t.

The goal isn’t to have exactly two ReadFile calls pending at all times. The goal is to have at least one call pending at all times. That gives the driver something to complete without waiting. The longer and more uncertain the time between IO completion and resubmittal, the deeper the queue you want to maintain. It is hard to have confidence in this time because it will depend heavily on unknowable factors like hardware configuration and other workload being handled by the OS, so it is generally better to err on the side of a deeper queue

Editing the code as per suggestion(to issue ReadFile request only in the completion routine), is this what you meant?
From the main thread, we still have to initially issue two read requests, right? Once those two read request are issued, further read can only be issued from the completion routine.

#define MAX_NUM_REQUEST (2)
typedef struct
{
    OVERLAPPED Overlapped;
    char       Buffer[1024];
    int        id;
    HANDLE     hfile;
    PTP_IO     thread_pool_obj;

} PER_IO_DATA;

VOID CALLBACK mycompletionRoutine(
    PTP_CALLBACK_INSTANCE pInstance,
    PVOID pvContext,
    PVOID pOverlapped,
    ULONG IoResult,
    ULONG_PTR NumberOfBytesTransferred,
    PTP_IO pIo)
{
    if (IoResult == NO_ERROR)
    {
        PER_IO_DATA* PerIoData = CONTAINING_RECORD(pOverlapped, PER_IO_DATA, Overlapped);
        if (PerIoData == NULL)
        {
            std::cout << "PerIodata is NULL" << std::endl;
        }

        // Do required processing(in this case , just print the id)
        std::cout << PerIoData->id << std::endl;

        SecureZeroMemory(PerIoData->Buffer, sizeof(PerIoData->Buffer));

        // Issue another read request
        StartThreadpoolIo(PerIoData->thread_pool_obj);

        bool ret = ReadFile(PerIoData->hfile, (PerIoData->Buffer), sizeof(PerIoData->Buffer), NULL, &(PerIoData->Overlapped));
        if (ret == 0)
        {
            DWORD status = GetLastError();
            if (status != ERROR_IO_PENDING)
            {
                std::cout << "IO operation failed" << std::endl;
                CancelThreadpoolIo(PerIoData->thread_pool_obj);
            }
        }
    }
    else
    {
        std::cout << "Completeion routine failure" << std::endl;
    }
}

DWORD WINAPI ReadThread(LPVOID lpParam)
{
    // Open handle to existing file in overlapped mode
    //L"CONIN$" 
    //L"C:\\test\\text.txt"
    HANDLE hFile = CreateFile(L"CONIN$",                 // This refers to console input
        GENERIC_READ,          // open for reading
        0,                      // do not share
        NULL,                   // default security
        OPEN_EXISTING,             // open existing file only
        FILE_FLAG_OVERLAPPED,  // overlapped/async operation
        NULL);                  // no attr. template

    if (hFile == INVALID_HANDLE_VALUE)
    {
        std::cout << "Invalid Handle value" << std::endl;
        return -1;
    }
    else
    {
        std::cout << "CreateFile success" << std::endl;
    }

    // Create a thread pool object and associate it with a file handle.
    PTP_IO thread_pool_obj = CreateThreadpoolIo(hFile, mycompletionRoutine, NULL, NULL);
    if (thread_pool_obj == NULL)
    {
        DWORD status = GetLastError();
        std::cout << "Thread pool obj is NULL" << status << std::endl;
        return -1;
    }
    else
    {
        std::cout << "Thread pool obj created successfully" << std::endl;
    }

    //  Issue two asynchronous request when the thread starts, 
    //  The goal is to keep atleast one request pending with the target driver at anytime
    // When one request is completed, pend a new request so that a total of two request will be pending with device.
    PER_IO_DATA IoDataPerThread[MAX_NUM_REQUEST] = {};
    HANDLE overlappedEvent[MAX_NUM_REQUEST];
    for (int i = 0; i < (MAX_NUM_REQUEST); i++)
    {
        // Issue a async read request
        overlappedEvent[i] = CreateEventW(NULL, FALSE, FALSE, NULL);
        IoDataPerThread[i].Overlapped.hEvent = overlappedEvent[i];
        IoDataPerThread[i].id = i;
        IoDataPerThread[i].hfile = hFile;
        IoDataPerThread[i].thread_pool_obj = thread_pool_obj;

        // Start thread pool object, as per MSDN StartThreadpoolIo must be called before issuing read/write request
        StartThreadpoolIo(thread_pool_obj);

        // Send async read file operation
        bool ret = ReadFile(hFile, (IoDataPerThread[i].Buffer), sizeof(IoDataPerThread[i].Buffer), NULL, &(IoDataPerThread[i].Overlapped));
        if (ret == 0)
        {
            DWORD status = GetLastError();
            if (status != ERROR_IO_PENDING)
            {
                std::cout << "IO operation failed" << std::endl;
                CancelThreadpoolIo(thread_pool_obj);
                return -1;
            }
        }
    }

    while (1)
    {
        //Maybe sleep this main thread or wait for shutdown of system
    }
  
    return 0;
}
int main()
{

    HANDLE ReadThreadHandle = CreateThread(NULL, 0, ReadThread, NULL, 0, NULL);

    if (ReadThreadHandle != NULL)
    {
        std::cout << "Thread creation is successfull" << std::endl;

        // Wait until thread has terminated.
        WaitForSingleObject(ReadThreadHandle, INFINITE);

        // Close thread handle upon completion.
        CloseHandle(ReadThreadHandle);

        std::cout << "Program ran successfully" << std::endl;
    }
    else
    {
        std::cout << "Thread creation failed" << std::endl;
    }

    return 0;
}

in mycompletionRoutine, your should check for and handle any error first, and then always issue a new IO even if the previous one failed - unless there is some kind of close or exit condition exists

also, your main and ReadThread are now redundant. This is two threads that execute in lock step, so there is no point in the second thread.

But yes, this is much better