{ 10-05-1999 10:37:03 PM > [martin on MARTIN] checked out /Reformatting according to Delphi guidelines. } { 06-04-1999 7:49:40 PM > [martin on MARTIN] checked out /Modifying Class Names } unit MCHPipeThreads; {Martin Harvey 7/11/98} {This unit gives us a base pipe thread type with some common support for error tracking} interface uses Classes,MCHPipeTypes,Windows,MCHMemoryStream; type TMCHPipeThread = class(TThread) private FOnTerminate:TNotifyEvent; protected FTermReason:TMCHError; public procedure Execute;override; published property TermReason:TMCHError read FTermReason; property OnTerminate:TNotifyEvent read FOnTerminate write FOnTerminate; end; TMCHPipeWriterThread = class(TMCHPipeThread) private FDataMutex,FIdleSemaphore:THandle; FPipeWriteHandle:TMCHHandle; FData:TMCHMemoryStream; FWriteIdx:integer; protected public constructor Create(CreateSuspended:boolean); procedure Execute;override; destructor Destroy;override; function WriteData(InStream:TStream):integer; {returns bytes written = InStream.Size} property PipeWriteHandle:TMCHHandle read FPipeWriteHandle write FPipeWriteHandle; end; TMCHPipeReaderThread = class(TMCHPipeThread) private { Private declarations } FDataMutex:THandle; FPipeReadHandle:TMCHHandle; FData:TMCHMemoryStream; FOnDataRecieved:TNotifyEvent; FOnConnect:TNotifyEvent; protected public constructor Create(CreateSuspended:boolean); procedure Execute;override; destructor Destroy;override; function ReadData(OutStream:TStream):integer; {returns bytes read} property OnDataRecieved:TNotifyEvent read FOnDataRecieved write FOnDataRecieved; property PipeReadHandle:TMCHHandle read FPipeReadHandle write FPipeReadHandle; property OnConnect:TNotifyEvent read FOnConnect write FOnConnect; end; implementation uses MCHPipeInterface2; const BufSize = 4096; type DataBuf = array[0..BufSize - 1] of integer; procedure TMCHPipeThread.Execute; begin if Assigned(FOnTerminate) then FOnTerminate(Self); end; constructor TMCHPipeReaderThread.Create(CreateSuspended:boolean); begin inherited Create(CreateSuspended); FDataMutex := CreateMutex(nil,false,nil); FData := TMCHMemoryStream.Create; end; destructor TMCHPipeReaderThread.Destroy; begin Terminate; if Suspended then Resume; WaitFor; FData.Free; CloseHandle(FDataMutex); inherited Destroy; end; function TMCHPipeReaderThread.ReadData(OutStream:TStream):integer; begin WaitForSingleObject(FDataMutex,INFINITE); try OutStream.Seek(0,soFromEnd); FData.Seek(0,soFromBeginning); Result := FData.Size; OutStream.CopyFrom(FData,FData.Size); FData.Clear; finally ReleaseMutex(FDataMutex); end; end; procedure TMCHPipeReaderThread.Execute; var Buffer:DataBuf; BytesToRead,BytesThisTime:integer; begin FTermReason := WaitForPeer(FPipeReadHandle); if FTermReason <> meOK then terminate else if Assigned(FOnConnect) then FOnConnect(Self); while not terminated do begin FTermReason := PeekData(FPipeReadHandle,BytesToRead); if FTermReason <> meOK then terminate; if (not terminated) then begin if BytesToRead <= 0 then begin {Callback handler should implement lazy async notification} if Assigned(FOnDataRecieved) then FOnDataRecieved(Self); BytesToRead := 1; end; if BytesToRead > BufSize then BytesThisTime := BufSize else BytesThisTime := BytesToRead; FTermReason := MCHPipeInterface2.ReadData(FPipeReadHandle,Buffer,BytesThisTime); if FTermReason <> meOK then terminate else begin WaitForSingleObject(FDataMutex,INFINITE); FData.Seek(0,soFromEnd); FData.WriteBuffer(Buffer,BytesThisTime); ReleaseMutex(FDataMutex); end; end; end; inherited Execute; end; constructor TMCHPipeWriterThread.Create(CreateSuspended:boolean); begin inherited Create(CreateSuspended); FDataMutex := CreateMutex(nil,false,nil); FIdleSemaphore := CreateSemaphore(nil,0,High(Integer),nil); FData := TMCHMemoryStream.Create; end; destructor TMCHPipeWriterThread.Destroy; begin Terminate; ReleaseSemaphore(FIdleSemaphore,1,nil); if Suspended then Resume; WaitFor; FData.Free; CloseHandle(FDataMutex); inherited Destroy; end; function TMCHPipeWriterThread.WriteData(InStream:TStream):integer; begin InStream.Seek(0,soFromBeginning); WaitForSingleObject(FDataMutex,INFINITE); try Result := InStream.Size; FData.Seek(0,soFromEnd); FData.CopyFrom(InStream,InStream.Size); finally ReleaseMutex(FDataMutex); end; ReleaseSemaphore(FIdleSemaphore,1,nil); end; procedure TMCHPipeWriterThread.Execute; var Buf:DataBuf; BytesThisTime,BytesToWrite:integer; begin while not (terminated) do begin WaitForSingleObject(FDataMutex,INFINITE); BytesToWrite := FData.Size - FWriteIdx; ReleaseMutex(FDataMutex); while (BytesToWrite > 0) and (not terminated) do begin if BytesToWrite > BufSize then BytesThisTime := BufSize else BytesThisTime := BytesToWrite; WaitForSingleObject(FDataMutex,INFINITE); FData.Seek(FWriteIdx,soFromBeginning); FData.ReadBuffer(Buf,BytesThisTime); ReleaseMutex(FDataMutex); {Note that we should not block when we have the mutex!} FTermReason := MCHPipeInterface2.WriteData(FPipeWriteHandle,Buf,BytesThisTime); if (FTermReason = meOK) then begin BytesToWrite := BytesToWrite - BytesThisTime; FWriteIdx := FWriteIdx + BytesThisTime; end else terminate; end; if (not (terminated)) then begin WaitForSingleObject(FDataMutex,INFINITE); {Cannot be sure that the stream hasn't been written to in the meantime!} {When the expression below is false, the wait on the idle semaphore will not block, so the stream should not get unnecessarily large} if FWriteIdx = FData.Size then begin FData.Clear; FWriteIdx := 0; end; ReleaseMutex(FDataMutex); WaitForSingleObject(FIdleSemaphore,INFINITE); end; end; inherited Execute; end; end.