{ 10-05-1999 10:38:00 PM > [martin on MARTIN] checked out /Reformatting according to Delphi guidelines. } { 10-05-1999 1:31:20 AM > [martin on MARTIN] update: Including winsock message in fatal error (0.4) / } { 08-05-1999 7:04:00 PM > [martin on MARTIN] checked out /Including winsock message in fatal error } { 14-04-1999 11:59:23 PM > [martin on MARTIN] update: Changing dynamic methods to virtual. (0.3) / } { 14-04-1999 11:54:16 PM > [martin on MARTIN] checked out /Changing dynamic methods to virtual. } { 14-04-1999 11:19:53 PM > [martin on MARTIN] update: Modifying code that checks whether OK to send, so that structures can be sent to the transaction manager whilst the associated socket is connecting. This means that the ONCP does not require a cache. (0.2) / } { 06-04-1999 9:14:07 PM > [martin on MARTIN] checked out /Modifying code that checks whether OK to send, so that structures can be sent to the transaction manager whilst the associated socket is connecting. This means that the ONCP does not require a cache. } { 06-04-1999 7:44:46 PM > [martin on MARTIN] checked out /Modifying class names } { 06-04-1999 1:46:40 AM > [martin on MARTIN] check in: (0.0) Initial Version / None } unit MCHTransactions; {Martin Harvey 20/10/1997 This unit implements a socket suitable for connection to a transaction manager, and it also defines the transaction manager. The transaction manager is an intermediary between the socket and the Data Object Protocol Manager The basic structure is as follows: TTransStreamSock<--->TMCHTransactionManager<---->TMCHDopManager The TMCHDopManager will be left to a separate unit. } { This unit has been slightly modified. 7/11/97 Whereas previously, the Transaction manager was created upon connection establishment, it is now created and destroyed at the same time as the socket object. However, the transaction manager now resets itself when a connection on the socket is created or destroyed. In addition, the stream socket now allows for the creation of derived classes from TMCHTransactionManager } { Yet more modifications, Martin Harvey 30/8/98 Due to various requirements, like having transaction managers that save to disks and to pipes, I'm creating a custom ancestor class that connects to a DOP, and then subclasses that work with sockets, files, pipes etc etc} interface uses MCHMemoryStream,Classes,DWinsock,SysUtils,WSAPI; type TFatalErrorEvent = procedure(Sender:TObject;Msg:string) of object; TMCHCustomTransactionManager = class private FOnReset:TNotifyEvent; FOnFatalError:TFatalErrorEvent; FOnDestroy:TNotifyEvent; FOnTransactionRecieved:TNotifyEvent; FDOPManager:TObject; protected procedure DoTransactionRecieved;virtual; procedure DoFatalError(Msg:string);virtual; procedure DoDestroy;virtual; function GetActive:boolean;virtual;abstract; public destructor Destroy;override; property DOPManager:TObject read FDOPManager write FDOPManager; property OnTransactionRecieved:TNotifyEvent read FOnTransactionRecieved write FOnTransactionRecieved; {informs user when it is about to be destroyed} property OnDestroy:TNotifyEvent read FOnDestroy write FOnDestroy; {informs user when a reset event is about to occur} property OnReset:TNotifyEvent read FOnReset write FOnReset; property OnFatalError:TFatalErrorEvent read FOnFatalError write FOnFatalError; property Active:boolean read GetActive; constructor Create; procedure WriteTransactionFromStream(ExternalIn:TStream);virtual;abstract; procedure ReadTransactionToStream(ExternalOut:TStream);virtual;abstract; procedure Reset;virtual; published end; TMCHTransactionManager = class; {forward declaration} TCustomManagerClass = class of TMCHCustomTransactionManager; TTransStreamSock = class(TStreamSocket) private FManager:TMCHCustomTransactionManager; protected {these methods normally call the parent component and allow delegation through the parent component to the form doing the delegation. I am overriding them to provide a direct way of letting the transaction manager get to the data first.} procedure DoAccept;override; procedure DoConnect;override; procedure DoDisconnect;override; procedure DoReject;override; procedure DoTimeout;override; procedure DoWrite;override; procedure DoRead;override; public property Manager:TMCHCustomTransactionManager read FManager write FManager; {connection to the manager object to allow direct access to it} constructor Create(AParent:TSockCtrl);override; {note that set manager class should only be called immediately after the socket is created} destructor Destroy;override; published end; TMCHTransactionManager = class(TMCHCustomTransactionManager) private FTransSock:TTransStreamSock; FInputStream, FOutputStream: TMCHMemoryStream; FReadingTransaction:boolean; FSocketReady:boolean; FReadTransactionStart, FReadTransactionEnd:integer; FWriteSocketOffset:integer; protected {offset of current transaction in read stream} property ReadTransactionStart:integer read FReadTransactionStart write FReadTransactionStart; {projected end of current transaction} property ReadTransactionEnd:integer read FReadTransactionEnd write FReadTransactionEnd; {position of writing data to socket} property WriteSocketOffset:integer read FWriteSocketOffset write FWriteSocketOffset; {stream properties} property InputStream:TMCHMemoryStream read FInputStream write FInputStream; property OutputStream:TMCHMemoryStream read FOutputStream write FOutputStream; {internal procedures} procedure TrimReadStream; procedure TrimWriteStream; {write to socket puts as many bytes as possible from ouput stream to socket, and returns number put} function WriteToSocket(Sock:TStreamSocket;Stream:TStream;pos,size:integer):integer; { read from socket reads as many bytes as possible to the end of the input stream, and returns how many bytes read} function ReadFromSocket(Sock:TStreamSocket;Stream:TStream):integer; function GetSocketReady:boolean; procedure ExtractTransactionsFromStream; function GetActive:boolean;override; public {user accessible properties} {owning socket} property TransSock:TTransStreamSock read FTransSock write FTransSock; {Reading side of things} {user event notifying that a transaction is ready} {note: this is the only chance that external code has of picking up the transaction, pointers are automatically incremented afterwards! (This may be changed later)} property OnTransactionRecieved; {informs user when it is about to be destroyed} property OnDestroy; {informs user when a reset event is about to occur} property OnReset; { an unfinished transaction is in the read stream } { This does not preclude a very short transaction existing. (ie header is too short to read length} { This property doesn't have a write specifier, so external objects can't modify it} property ReadingTransaction:boolean read FReadingTransaction; { The socket ready property is used to ensure that only one send operation is called for every OnWrite event} property SocketReady:boolean read GetSocketReady; property Active; {constructor and destructor. These do not need to be called manually, since the owning socket will take care of this } constructor Create; destructor Destroy;override; {user methods for writing transactions} { The input stream should contain all the data for one transaction} { Note that the outside agent owns the external stream} procedure WriteTransactionFromStream(ExternalIn:TStream);override; {user methods for reading transactions} {The output stream contains a copy of the transaction data This should be called externally when an OnTransactionRecieved event occurs} {Note that the outside agent owns the external stream} procedure ReadTransactionToStream(ExternalOut:TStream);override; {methods for owning socket to call. The user should *not* call these!!} {writing side of things} procedure WriteDataRemainder; {reading side} procedure ReadNewData; { a reset procedure to be used when the connection underlying the socket has been closed} procedure Reset;override; published property OnFatalError; end; implementation uses MCHDOPManager,Dialogs; constructor TMCHCustomTransactionManager.Create; begin inherited Create; DOPManager := TMCHDopManager.Create; (DOPManager as TMCHDopManager).DOPTransactionManager := Self; end; destructor TMCHCustomTransactionManager.Destroy; begin DoDestroy; DOPManager.Free; DOPManager := nil; inherited Destroy; end; procedure TMCHCustomTransactionManager.DoFatalError(Msg:string); begin if Assigned(FOnFatalError) then FOnFatalError(Self,Msg); end; procedure TMCHCustomTransactionManager.Reset; begin if Assigned(DOPManager) then (DOPManager as TMCHDopManager).Reset; if Assigned(FOnReset) then FOnReset(Self); end; procedure TMCHCustomTransactionManager.DoTransactionRecieved; begin if (Assigned(FOnTransactionRecieved)) then FOnTransactionRecieved(Self); try if Assigned(DOPManager) then (DOPManager as TMCHDopManager).GetIncomingTransaction(Self); except on E:ENetworkError do begin ShowMessage('Exception raised on remote machine, ' + E.RemoteExceptionName + ' ' + E.Message); end; end; end; procedure TMCHCustomTransactionManager.DoDestroy; begin if (Assigned(FOnDestroy)) then FOnDestroy(Self); end; procedure TTransStreamSock.DoAccept; begin if LastError = 0 then begin if Assigned(Manager) then Manager.Reset; end; inherited DoAccept; end; procedure TTransStreamSock.DoConnect; begin {Note that this has been modified. We now only reset the transaction manager if the connection *fails*} if LastError <> 0 then begin if Assigned(Manager) then Manager.Reset; end; inherited DoConnect; end; procedure TTransStreamSock.DoDisconnect; begin if Assigned(Manager) then Manager.Reset; inherited DoDisconnect; end; procedure TTransStreamSock.DoReject; begin if Assigned(Manager) then Manager.Reset; inherited DoReject; end; procedure TTransStreamSock.DoTimeOut; begin if Assigned(Manager) then Manager.Reset; inherited DoTimeOut; end; procedure TTransStreamSock.DoWrite; begin if Assigned(Manager) then (Manager as TMCHTransactionManager).WriteDataRemainder; inherited DoWrite; end; procedure TTransStreamSock.DoRead; begin if Assigned(Manager) then (Manager as TMCHTransactionManager).ReadNewData; inherited DoRead; end; destructor TTransStreamSock.Destroy; begin if Assigned(Manager) then begin Manager.Free; Manager := nil; end; inherited Destroy; end; constructor TTransStreamSock.Create(AParent:TSockCtrl); begin inherited Create(AParent); Manager := TMCHTransactionManager.Create; (Manager as TMCHTransactionManager).TransSock := Self; end; function TMCHTransactionManager.GetSocketReady:boolean; begin result := FTransSock.Connected and FSocketReady; end; {$WARNINGS OFF} function TMCHTransactionManager.ReadFromSocket(Sock:TStreamSocket;Stream:TStream):integer; const blocksize = 4096; var databuf:array[0..blocksize - 1] of byte; TotalBytesRead,BytesRead:integer; begin Stream.Seek(0,SoFromEnd); {seek to the end of the input stream} TotalBytesRead := 0; repeat try BytesRead := Sock.Recv(databuf,blocksize); except on E:ESockError do begin DoFatalError(E.Message); BytesRead := 0; end; end; Stream.WriteBuffer(Databuf,BytesRead); TotalBytesRead := TotalBytesRead + BytesRead; until BytesRead < blocksize; result := TotalBytesRead; end; {$WARNINGS ON} constructor TMCHTransactionManager.Create; begin inherited Create; FInputStream := TMCHMemoryStream.Create; FOutputStream := TMCHMemoryStream.Create; end; destructor TMCHTransactionManager.Destroy; begin FInputStream.Free; FOutputStream.Free; inherited Destroy; end; function TMCHTransactionManager.GetActive:boolean; begin result := true; if (WriteSocketOffset = OutputStream.Size) and (ReadTransactionStart = InputStream.Size) and (not ReadingTransaction) then result := false; end; procedure TMCHTransactionManager.Reset; {This procedure resets all pointers and clears the stream} begin inherited Reset; ReadTransactionStart := 0; ReadTransactionEnd := 0; WriteSocketOffset := 0; FReadingTransaction := false; InputStream.SetSize(0); OutputStream.SetSize(0); end; procedure TMCHTransactionManager.ReadNewData; var TransLength:integer; begin ReadFromSocket(TransSock,InputStream); if (not (ReadingTransaction)) then { No transactions pending, Read Start & Read End point to same place} begin if (InputStream.Size >= (ReadTransactionEnd + SizeOf(TransLength))) then begin InputStream.Seek(ReadTransactionEnd,soFromBeginning); InputStream.ReadBuffer(TransLength,SizeOf(TransLength)); ReadTransactionStart := InputStream.Position; ReadTransactionEnd := ReadTransactionStart + TransLength; FReadingTransaction := true; end; end; ExtractTransactionsFromStream; end; procedure TMCHTransactionManager.ExtractTransactionsFromStream; var TransLength:integer; begin {do we have a complete transaction?} while ((ReadTransactionEnd <= InputStream.Size) and ReadingTransaction) do begin {we have a complete pending transaction} DoTransactionRecieved; {this will pick the transaction up if the user assigns a handler} {now advance pointers to the next transaction} ReadTransactionStart := ReadTransactionEnd; if (InputStream.Size >= ReadTransactionStart + SizeOf(TransLength)) then begin {we at least have the integer transaction length} InputStream.Seek(ReadTransactionEnd,soFromBeginning); InputStream.ReadBuffer(TransLength,SizeOf(TransLength)); ReadTransactionStart := InputStream.Position; ReadTransactionEnd := ReadTransactionStart + TransLength; {now we know where the transaction ends and the pointers are valid} end else {we don't have anything to work with} FReadingTransaction := false; end; TrimReadStream; end; procedure TMCHTransactionManager.TrimReadStream; const MinStreamSize = 1024; var NewStream:TMCHMemoryStream; begin {General policy here: If more than 4/5 of the stream is not used, then clear it up. In all cases, all data before ReadTransactionStart can be discarded if stream less than 1k, then don't bother. } if (((InputStream.Size - ReadTransactionStart) * 5) < InputStream.Size) and (InputStream.Size > MinStreamSize) then begin NewStream := TMCHMemoryStream.Create; { do the work} {copy data from old to new stream} InputStream.Seek(ReadTransactionStart,soFromBeginning); if (InputStream.Size - ReadTransactionStart) > 0 then NewStream.CopyFrom(InputStream,(InputStream.Size - ReadTransactionStart)); {Swap stream ownership and destroy old stream} InputStream.Free; InputStream := NewStream; {reset pointers accordingly} ReadTransactionEnd := ReadTransactionEnd - ReadTransactionStart; ReadTransactionStart := 0; end; end; procedure TMCHTransactionManager.ReadTransactionToStream(ExternalOut:TStream); {This will copy the current transaction, but will not modify any pointers} begin InputStream.Seek(ReadTransactionStart,soFromBeginning); ExternalOut.CopyFrom(InputStream,ReadTransactionEnd - ReadTransactionStart); end; {$WARNINGS OFF} function TMCHTransactionManager.WriteToSocket(Sock:TStreamSocket;Stream:TStream;pos,size:integer):integer; const blocksize = 4096; var databuf:array[0..blocksize - 1] of byte; TotalBytesWritten,BytesWritten,BlockBytesToWrite,BytesToWrite:integer; {write as much as we can to the socket until it's full} begin Stream.Seek(pos,soFromBeginning); BytesToWrite := size; TotalBytesWritten := 0; repeat if BytesToWrite > blocksize then BlockBytesToWrite := blocksize else BlockBytesToWrite := BytesToWrite; Stream.ReadBuffer(databuf,BlockBytesToWrite); try BytesWritten := Sock.Send(databuf,BlockBytesToWrite); except on E:ESockError do begin DoFatalError(E.Message); BytesWritten := 0; end; end; TotalBytesWritten := TotalBytesWritten + BytesWritten; BytesToWrite := BytesToWrite - BytesWritten until (BytesToWrite = 0) or (BytesWritten < BlockBytesToWrite); result := TotalBytesWritten; end; {$WARNINGS ON} procedure TMCHTransactionManager.WriteDataRemainder; var BytesWritten:integer; BytesToWrite:integer; begin FSocketReady := true; {socket is ready for more data to be written} BytesToWrite := OutputStream.Size - WriteSocketOffset; if BytesToWrite > 0 then begin {write as much as we can} BytesWritten := WriteToSocket(TransSock,OutputStream,WriteSocketOffset,BytesToWrite); WriteSocketOffset := WriteSocketOffset + BytesWritten; if BytesWritten < BytesToWrite then FSocketReady := false; { we'll get a callback ready for more data} end; TrimWriteStream; end; procedure TMCHTransactionManager.TrimWriteStream; const MinStreamSize = 1024; var NewStream:TMCHMemoryStream; begin {Same general policy as for trimming the read stream} if (((OutputStream.Size - WriteSocketOffset) * 5) < OutputStream.Size) and (OutputStream.Size > MinStreamSize) then begin NewStream := TMCHMemoryStream.Create; try {copy} OutputStream.Seek(WriteSocketOffset,soFromBeginning); if (OutputStream.Size - WriteSocketOffset) > 0 then NewStream.CopyFrom(OutputStream,(OutputStream.Size - WriteSocketOffset)); {Swap ownership & destroy} OutputStream.Free; OutputStream := TMCHMemoryStream.Create; OutputStream.LoadFromStream(NewStream); WriteSocketOffset := 0; finally NewStream.Free; end; end; end; procedure TMCHTransactionManager.WriteTransactionFromStream(ExternalIn:TStream); var Length:integer; begin { write length data to output stream } Length := ExternalIn.Size; OutputStream.Seek(0,soFromEnd); OutputStream.WriteBuffer(Length,SizeOf(Length)); ExternalIn.Seek(0,soFromBeginning); OutputStream.CopyFrom(ExternalIn,ExternalIn.Size); if SocketReady then WriteDataRemainder; {only force a send if the socket is empty and connected} end; end.