From 0aac0d2464f4e8f64fc20b054ac683ba2b4542ac Mon Sep 17 00:00:00 2001 From: Frederic Kehrein Date: Tue, 18 Feb 2020 20:56:08 +0100 Subject: [PATCH] Restructuring and adding LPK --- .gitignore | 1 + Readme.md | 18 +- .../chatClient.html | 0 example.lpi => examples/chatServer.lpi | 165 +- example.lpr => examples/chatServer.pas | 202 +-- examples/makefile | 13 + src/websocketserver.pas | 641 ++++++++ src/wsmessages.pas | 97 ++ src/wsstream.pas | 654 ++++++++ utilities.pas => src/wsutils.pas | 2 +- websocket.pas | 1357 ----------------- websockets.lpk | 55 + websockets.pas | 21 + 13 files changed, 1672 insertions(+), 1554 deletions(-) rename example_client.html => examples/chatClient.html (100%) rename example.lpi => examples/chatServer.lpi (62%) rename example.lpr => examples/chatServer.pas (92%) create mode 100644 examples/makefile create mode 100644 src/websocketserver.pas create mode 100644 src/wsmessages.pas create mode 100644 src/wsstream.pas rename utilities.pas => src/wsutils.pas (95%) delete mode 100644 websocket.pas create mode 100644 websockets.lpk create mode 100644 websockets.pas diff --git a/.gitignore b/.gitignore index adb96dd..9bc2e7f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Lazarus compiler-generated binaries (safe to delete) +bin/ *.exe *.dll *.so diff --git a/Readme.md b/Readme.md index 71d7524..387c8aa 100644 --- a/Readme.md +++ b/Readme.md @@ -4,10 +4,10 @@ It is fully based upon the fcl `ssockets` unit and therefore independent from an It can thereby easiely built only using fpc without Lazarus or complicated makefiles. ## Installation -There is currently no Lazarus package, as this is in quite an early phase. To use, simply add `websockets.pas` and `utilities.pas` to your project and start hacking +There is a Lazarus package file (`websockets.lpk`) in this repository that can be used for Lazarus projects. An other option is to simply add the `src` directory to your unit search path. ## Usage & Functionality -For a simple example server see the `example.lpi` Lazarus project. +For a simple example server see the `chatServer.pas` in the examples directory. ### Setting up the Server To create a `TWebSocketServer` the constructor mirrors the constructor of `ssockets.TInetServer` and can be called with either only a port, or an address, port and optionally a `ssockets.TSocketHandler` (e.g. to provide TLS support). So in order to create a simple WebSocket listener on port 8080 we can simply use: @@ -60,7 +60,7 @@ It will then start accepting clients on this thread until `Server.Stop` is calle ### Recieving and Sending Messages All the communication is done via the `TWebsocketCommunincator` class. It provides three basic methods for communication: ``` -procedure RecieveMessages; +procedure RecieveMessage; function GetUnprocessedMessages(const MsgList: TWebsocketMessageOwnerList): integer; function WriteMessage(MessageType: TWebsocketMessageType = wmtString; MaxFrameLength: int64 = 125): TWebsocketMessageStream; @@ -69,12 +69,14 @@ function WriteMessage(MessageType: TWebsocketMessageType = wmtString; Lastly we have `WriteMessage` which creates a `TWebsocketMessageStream` for us to send messages to the client. These should be either string (`wmtString`), binary (`wmtBinary`) or ping (`wmtPing`) messages. After a ping, the responding pong will be recieved by RecieveMessages and can be processed by the user as any other message. +While `RecieveMessage` is blocking until at least one message is read. But it is implemented thread safe, meaning you can send messages while reading, without problems. This is archived by locking, while in general not nessecary as reading and writing can technically be done in parallel, but to avoid complications we lock stream access + Besides those the `TWebsocketCommunincator` class also provides two properties: ``` -property SocketStream: TSocketStream; +property SocketStream: TLockedSocketStream; property Open: boolean; ``` -`SocketStream` grants access to the raw underlying connection and `Open` contains the state of the stream. At the current point in time the stream access is not locked, as the reading method `RecieveMessages` is blocking. So it can theoretically happen that the stream gets closed while writing to it. If this happen, pray for an error to arise, because than anything could happen. +`SocketStream` grants access to the raw underlying connection and `Open` can be used to check whether the stream is still open. Lastly the communicator provides two events: ``` @@ -84,8 +86,8 @@ property OnClose: TNotifyEvent; `OnRecieveMessage` is triggered when `RecieveMessages` adds a new message to the message queue and `OnClose` will be triggered when the stream closes, either due to an abrupt disconnect of the underlying TCP stream (detected by a stream reding error while recieving Messages) or after sending the close message. It will be called before the `TSocketStream` object will be destroyed, so you still have access to it, e.g. to its `RemoteAddress` attribute to identify the client. Both of these events are fired in the context of the thread discovering them, most likely the thread calling `RecieveMessages`. Any cross thread accesses need to be secured by the user, either using `TThread.Queue`, `TThread.Synchronize`, `critical sections` or any other method of handling inter-thread communications. ## Example -The example server can be built either by opening the Lazarus project (`example.lpi`) and building it with the IDE or using fpc directly by calling +The `chatServer` example can be built in multiple ways. Either by opening the Lazarus project (`examples/chatServer.lpi`) and building it with the IDE, or by using make in the examples directory, or by using the fpc directly via: ``` -$> fpc ./example.lpr +$> fpc -Fu ../src chatServer.pas ``` -The client for this is the html document `example_client.html` and should be usable with any modern browser. The example is a simple chat that lets the user input text messages to send to the other party, and recieve their messages asynchronously. You can try to connect with multiple clients at once to the server as it uses a threaded handler, but isn't built for reading more than a message for one client at a time, so funny things might happen. +The client for this is the html document `chatClient.html` and should be usable with any modern browser. The example is a simple chat that lets the user input text messages to send to the other party, and recieve their messages asynchronously. You can try to connect with multiple clients at once to the server as it uses a threaded handler, but isn't built for reading more than a message for one client at a time, so funny things might happen. diff --git a/example_client.html b/examples/chatClient.html similarity index 100% rename from example_client.html rename to examples/chatClient.html diff --git a/example.lpi b/examples/chatServer.lpi similarity index 62% rename from example.lpi rename to examples/chatServer.lpi index 2399ffb..d4facb7 100644 --- a/example.lpi +++ b/examples/chatServer.lpi @@ -1,87 +1,78 @@ - - - - - - - - - - - - - - - <UseAppBundle Value="False"/> - <ResourceType Value="res"/> - </General> - <BuildModes Count="1"> - <Item1 Name="Default" Default="True"/> - </BuildModes> - <PublishOptions> - <Version Value="2"/> - <UseFileFilters Value="True"/> - </PublishOptions> - <RunParams> - <FormatVersion Value="2"/> - </RunParams> - <Units Count="3"> - <Unit0> - <Filename Value="example.lpr"/> - <IsPartOfProject Value="True"/> - </Unit0> - <Unit1> - <Filename Value="websocket.pas"/> - <IsPartOfProject Value="True"/> - <UnitName Value="WebSocket"/> - </Unit1> - <Unit2> - <Filename Value="utilities.pas"/> - <IsPartOfProject Value="True"/> - </Unit2> - </Units> - </ProjectOptions> - <CompilerOptions> - <Version Value="11"/> - <PathDelim Value="\"/> - <Target> - <Filename Value="example"/> - </Target> - <SearchPaths> - <IncludeFiles Value="$(ProjOutDir)"/> - <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/> - </SearchPaths> - <Parsing> - <SyntaxOptions> - <IncludeAssertionCode Value="True"/> - </SyntaxOptions> - </Parsing> - <CodeGeneration> - <Checks> - <IOChecks Value="True"/> - <RangeChecks Value="True"/> - <OverflowChecks Value="True"/> - <StackChecks Value="True"/> - </Checks> - <VerifyObjMethodCallValidity Value="True"/> - </CodeGeneration> - <Linking> - <Debugging> - <UseHeaptrc Value="True"/> - </Debugging> - </Linking> - </CompilerOptions> - <Debugging> - <Exceptions Count="3"> - <Item1> - <Name Value="EAbort"/> - </Item1> - <Item2> - <Name Value="ECodetoolError"/> - </Item2> - <Item3> - <Name Value="EFOpenError"/> - </Item3> - </Exceptions> - </Debugging> -</CONFIG> +<?xml version="1.0" encoding="UTF-8"?> +<CONFIG> + <ProjectOptions> + <Version Value="11"/> + <PathDelim Value="\"/> + <General> + <Flags> + <MainUnitHasCreateFormStatements Value="False"/> + <MainUnitHasTitleStatement Value="False"/> + <MainUnitHasScaledStatement Value="False"/> + </Flags> + <SessionStorage Value="InProjectDir"/> + <MainUnit Value="0"/> + <Title Value="chatServer"/> + <UseAppBundle Value="False"/> + <ResourceType Value="res"/> + </General> + <BuildModes Count="1"> + <Item1 Name="Default" Default="True"/> + </BuildModes> + <PublishOptions> + <Version Value="2"/> + <UseFileFilters Value="True"/> + </PublishOptions> + <RunParams> + <FormatVersion Value="2"/> + <Modes Count="0"/> + </RunParams> + <Units Count="5"> + <Unit0> + <Filename Value="chatServer.pas"/> + <IsPartOfProject Value="True"/> + </Unit0> + <Unit1> + <Filename Value="..\src\websocketserver.pas"/> + <IsPartOfProject Value="True"/> + <UnitName Value="WebSocketServer"/> + </Unit1> + <Unit2> + <Filename Value="..\src\wsmessages.pas"/> + <IsPartOfProject Value="True"/> + </Unit2> + <Unit3> + <Filename Value="..\src\wsstream.pas"/> + <IsPartOfProject Value="True"/> + </Unit3> + <Unit4> + <Filename Value="..\src\wsutils.pas"/> + <IsPartOfProject Value="True"/> + </Unit4> + </Units> + </ProjectOptions> + <CompilerOptions> + <Version Value="11"/> + <PathDelim Value="\"/> + <Target> + <Filename Value="bin/chatServer"/> + </Target> + <SearchPaths> + <IncludeFiles Value="$(ProjOutDir)"/> + <OtherUnitFiles Value="..\src"/> + <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + </CompilerOptions> + <Debugging> + <Exceptions Count="3"> + <Item1> + <Name Value="EAbort"/> + </Item1> + <Item2> + <Name Value="ECodetoolError"/> + </Item2> + <Item3> + <Name Value="EFOpenError"/> + </Item3> + </Exceptions> + </Debugging> +</CONFIG> diff --git a/example.lpr b/examples/chatServer.pas similarity index 92% rename from example.lpr rename to examples/chatServer.pas index 6c38dd6..333de60 100644 --- a/example.lpr +++ b/examples/chatServer.pas @@ -1,101 +1,101 @@ -program example; - -{$mode objfpc}{$H+} - -uses {$IFDEF UNIX} - cthreads, {$ENDIF} - Classes, - SysUtils, - WebSocket, - utilities, - Sockets { you can add units after this }; - -type - - { TSocketHandler } - - TSocketHandler = class(TThreadedWebsocketHandler) - function Accept(const ARequest: TRequestData; - const ResponseHeaders: TStrings): boolean; override; - procedure DoHandleCommunication(ACommunication: TWebsocketCommunincator); - override; - private - procedure ConnectionClosed(Sender: TObject); - procedure MessageRecieved(Sender: TObject); - end; - -var - socket: TWebSocketServer; - - { TSocketHandler } - - function TSocketHandler.Accept(const ARequest: TRequestData; - const ResponseHeaders: TStrings): boolean; - begin - Result := True; - end; - - procedure TSocketHandler.DoHandleCommunication( - ACommunication: TWebsocketCommunincator); - var - str: string; - begin - WriteLn('Connected to ', ACommunication.SocketStream.RemoteAddress.Address); - ACommunication.OnRecieveMessage := @MessageRecieved; - ACommunication.OnClose := @ConnectionClosed; - while ACommunication.Open do - begin - ReadLn(str); - if not ACommunication.Open then - Break; // could be closed by the time ReadLn takes - with ACommunication.WriteMessage do - try - WriteRaw(str); - finally - Free; - end; - WriteLn('Message to ', ACommunication.SocketStream.RemoteAddress.Address, - ': ', str); - end; - socket.Stop(True); - end; - - procedure TSocketHandler.ConnectionClosed(Sender: TObject); - var - Comm: TWebsocketCommunincator; - begin - Comm := TWebsocketCommunincator(Sender); - WriteLn('Connection to ', Comm.SocketStream.RemoteAddress.Address, ' closed'); - end; - - procedure TSocketHandler.MessageRecieved(Sender: TObject); - var - Messages: TWebsocketMessageOwnerList; - m: TWebsocketMessage; - Comm: TWebsocketCommunincator; - begin - Comm := TWebsocketCommunincator(Sender); - Messages := TWebsocketMessageOwnerList.Create(True); - try - Comm.GetUnprocessedMessages(Messages); - for m in Messages do - if m is TWebsocketStringMessage then - begin - WriteLn('Message from ', Comm.SocketStream.RemoteAddress.Address, - ': ', TWebsocketStringMessage(m).Data); - end; - finally - Messages.Free; - end; - end; - -begin - socket := TWebSocketServer.Create(8080); - try - socket.FreeHandlers := True; - socket.RegisterHandler('*', '*', TSocketHandler.Create, True, True); - socket.Start; - finally - socket.Free; - end; -end. +program chatServer; + +{$mode objfpc}{$H+} + +uses {$IFDEF UNIX} + cthreads, {$ENDIF} + classes, + wsutils, + wsmessages, + wsstream, + websocketserver; + +type + + { TSocketHandler } + + TSocketHandler = class(TThreadedWebsocketHandler) + function Accept(const ARequest: TRequestData; + const ResponseHeaders: TStrings): boolean; override; + procedure DoHandleCommunication(ACommunication: TWebsocketCommunincator); + override; + private + procedure ConnectionClosed(Sender: TObject); + procedure MessageRecieved(Sender: TObject); + end; + +var + socket: TWebSocketServer; + + { TSocketHandler } + + function TSocketHandler.Accept(const ARequest: TRequestData; + const ResponseHeaders: TStrings): boolean; + begin + Result := True; + end; + + procedure TSocketHandler.DoHandleCommunication( + ACommunication: TWebsocketCommunincator); + var + str: string; + begin + WriteLn('Connected to ', ACommunication.SocketStream.RemoteAddress.Address); + ACommunication.OnRecieveMessage := @MessageRecieved; + ACommunication.OnClose := @ConnectionClosed; + while ACommunication.Open do + begin + ReadLn(str); + if not ACommunication.Open then + Break; // could be closed by the time ReadLn takes + with ACommunication.WriteMessage do + try + WriteRaw(str); + finally + Free; + end; + WriteLn('Message to ', ACommunication.SocketStream.RemoteAddress.Address, + ': ', str); + end; + socket.Stop(True); + end; + + procedure TSocketHandler.ConnectionClosed(Sender: TObject); + var + Comm: TWebsocketCommunincator; + begin + Comm := TWebsocketCommunincator(Sender); + WriteLn('Connection to ', Comm.SocketStream.RemoteAddress.Address, ' closed'); + end; + + procedure TSocketHandler.MessageRecieved(Sender: TObject); + var + Messages: TWebsocketMessageOwnerList; + m: TWebsocketMessage; + Comm: TWebsocketCommunincator; + begin + Comm := TWebsocketCommunincator(Sender); + Messages := TWebsocketMessageOwnerList.Create(True); + try + Comm.GetUnprocessedMessages(Messages); + for m in Messages do + if m is TWebsocketStringMessage then + begin + WriteLn('Message from ', Comm.SocketStream.RemoteAddress.Address, + ': ', TWebsocketStringMessage(m).Data); + end; + finally + Messages.Free; + end; + end; + +begin + socket := TWebSocketServer.Create(8080); + try + socket.FreeHandlers := True; + socket.RegisterHandler('*', '*', TSocketHandler.Create, True, True); + socket.Start; + finally + socket.Free; + end; +end. diff --git a/examples/makefile b/examples/makefile new file mode 100644 index 0000000..e90158c --- /dev/null +++ b/examples/makefile @@ -0,0 +1,13 @@ +all: bin bin/chatServer + +FPC_OPT=-g -gh -Ci -Cr -Co -Ct -CR -Sa + +clean: + rm -rf bin + +bin: + mkdir bin + +bin/%: %.pas ../src/*.pas + fpc -Fu../src -FE./bin ${FPC_OPT} $< ;\ + rm bin/*.o bin/*.ppu diff --git a/src/websocketserver.pas b/src/websocketserver.pas new file mode 100644 index 0000000..00de029 --- /dev/null +++ b/src/websocketserver.pas @@ -0,0 +1,641 @@ +unit WebSocketServer; + +{$mode objfpc}{$H+} + +interface + +uses + Classes, SysUtils, ssockets, fgl, sha1, base64, wsutils, wsstream; + +type + + { TRequestHeaders } + + TRequestHeaders = class(specialize TFPGMap<string, string>) + public + procedure Parse(const HeaderString: string); + constructor Create; + end; + + TRequestData = record + Host: string; + Path: string; + Key: string; + Headers: TRequestHeaders; + end; + + { TWebsocketHandler } + + TWebsocketHandler = class + public + function Accept(const ARequest: TRequestData; + const ResponseHeaders: TStrings): boolean; virtual; + procedure HandleCommunication(ACommunicator: TWebsocketCommunincator); virtual; + end; + + TThreadedWebsocketHandler = class(TWebsocketHandler) + public + procedure HandleCommunication(ACommunicator: TWebsocketCommunincator); override; + procedure DoHandleCommunication(ACommunication: TWebsocketCommunincator); virtual; + end; + + { THostHandler } + + THostHandler = class(specialize TStringObjectMap<TWebsocketHandler>) + private + FHost: string; + public + constructor Create(const AHost: string; FreeObjects: boolean); + property Host: string read FHost; + end; + + { THostMap } + + THostMap = class(specialize TStringObjectMap<THostHandler>) + public + constructor Create; + procedure AddHost(const AHost: THostHandler); + end; + + { TLockedHostMap } + + TLockedHostMap = class(specialize TThreadedObject<THostMap>) + public + constructor Create; + end; + + TServerAcceptingMethod = (samDefault, samThreaded, samThreadPool); + + { TWebSocketServer } + + TWebSocketServer = class + private + FSocket: TInetServer; + FHostMap: TLockedHostMap; + FFreeHandlers: boolean; + FAcceptingMethod: TServerAcceptingMethod; + + procedure DoCreate; + procedure HandleConnect(Sender: TObject; Data: TSocketStream); + public + procedure Start; + procedure Stop(DoAbort: boolean = False); + + procedure RegisterHandler(const AHost: string; const APath: string; + AHandler: TWebsocketHandler; DefaultHost: boolean = False; + DefaultPath: boolean = False); + + destructor Destroy; override; + constructor Create(const AHost: string; const APort: word; + AHandler: TSocketHandler); + constructor Create(const APort: word); + property Socket: TInetServer read FSocket; + property FreeHandlers: boolean read FFreeHandlers write FFreeHandlers; + property AcceptingMethod: TServerAcceptingMethod + read FAcceptingMethod write FAcceptingMethod; + end; + +const + MalformedRequestMessage = + 'HTTP/1.1 400 Bad Request'#13#10#13#10'Not a Websocket Request'; + ForbiddenRequestMessage = + 'HTTP/1.1 403 Forbidden'#13#10#13#10'Request not accepted by Handler'; + HandlerNotFoundMessage = 'HTTP/1.1 404 Not Found'#13#10#13#10'No Handler registered for this request'; + + +implementation + +type + + {Thread Types} + { TWebsocketHandlerThread } + + TWebsocketHandlerThread = class(TPoolableThread) + private + FCommunicator: TWebsocketCommunincator; + FHandler: TThreadedWebsocketHandler; + protected + procedure DoExecute; override; + property Handler: TThreadedWebsocketHandler read FHandler write FHandler; + property Communicator: TWebsocketCommunincator + read FCommunicator write FCommunicator; + end; + + THandlerThreadFactory = specialize TPoolableThreadFactory<TWebsocketHandlerThread>; + THandlerThreadPool = specialize TObjectPool<TWebsocketHandlerThread, + THandlerThreadFactory, THandlerThreadFactory>; + TLockedHandlerThreadPool = specialize TThreadedObject<THandlerThreadPool>; + + { TWebsocketRecieverThread } + + TWebsocketRecieverThread = class(TPoolableThread) + private + FCommunicator: TWebsocketCommunincator; + FStopped: boolean; + protected + procedure DoExecute; override; + procedure Kill; + property Communicator: TWebsocketCommunincator + read FCommunicator write FCommunicator; + end; + + TRecieverThreadFactory = specialize TPoolableThreadFactory<TWebsocketRecieverThread>; + TRecieverThreadPool = specialize TObjectPool<TWebsocketRecieverThread, + TRecieverThreadFactory, TRecieverThreadFactory>; + TLockedRecieverThreadPool = specialize TThreadedObject<TRecieverThreadPool>; + + { TWebsocketHandshakeHandler } + + TWebsocketHandshakeHandler = class + private + FStream: TSocketStream; + FHostMap: TLockedHostMap; + function ReadRequest(var RequestData: TRequestData): boolean; + function GenerateAcceptingKey(const Key: string): string; + public + procedure PerformHandshake; + constructor Create(AStream: TSocketStream; AHostMap: TLockedHostMap); + end; + + { TAcceptingThread } + + TAcceptingThread = class(TPoolableThread) + private + FHandshakeHandler: TWebsocketHandshakeHandler; + protected + procedure DoExecute; override; + + property HandshakeHandler: TWebsocketHandshakeHandler + read FHandshakeHandler write FHandshakeHandler; + end; + + TAcceptingThreadFactory = specialize TPoolableThreadFactory<TAcceptingThread>; + TAcceptingThreadPool = specialize TObjectPool<TAcceptingThread, + TAcceptingThreadFactory, TAcceptingThreadFactory>; + TLockedAcceptingThreadPool = specialize TThreadedObject<TAcceptingThreadPool>; + +var + RecieverThreadPool: TLockedRecieverThreadPool; + HandlerThreadPool: TLockedHandlerThreadPool; + AcceptingThreadPool: TLockedAcceptingThreadPool; + +function CreateAcceptingThread( + const AHandshakeHandler: TWebsocketHandshakeHandler): TAcceptingThread; inline; +var + pool: TAcceptingThreadPool; +begin + pool := AcceptingThreadPool.Lock; + try + Result := pool.GetObject; + Result.HandshakeHandler := AHandshakeHandler; + Result.Restart; + finally + AcceptingThreadPool.Unlock; + end; +end; + +function CreateHandlerThread(const ACommunicator: TWebsocketCommunincator; + const AHandler: TThreadedWebsocketHandler): TWebsocketHandlerThread; inline; +var + pool: THandlerThreadPool; +begin + pool := HandlerThreadPool.Lock; + try + Result := pool.GetObject; + Result.Communicator := ACommunicator; + Result.Handler := AHandler; + Result.Restart; + finally + HandlerThreadPool.Unlock; + end; +end; + +function CreateRecieverThread(const ACommunicator: TWebsocketCommunincator): +TWebsocketRecieverThread; inline; +var + pool: TRecieverThreadPool; +begin + pool := RecieverThreadPool.Lock; + try + Result := pool.GetObject; + Result.Communicator := ACommunicator; + Result.Restart; + finally + RecieverThreadPool.Unlock; + end; +end; + +{ TRequestHeaders } + +function DoHeaderKeyCompare(const Key1, Key2: string): integer; +begin + // Headers are case insensetive + Result := CompareStr(Key1.ToLower, Key2.ToLower); +end; + +{ TWebsocketHandlerThread } + +procedure TWebsocketHandlerThread.DoExecute; +var + Recv: TWebsocketRecieverThread; +begin + Recv := CreateRecieverThread(FCommunicator); + try + try + FHandler.DoHandleCommunication(FCommunicator); + finally + FCommunicator.Close; + FCommunicator.Free; + end; + finally + Recv.Kill; + end; +end; + +procedure TAcceptingThread.DoExecute; +begin + FHandshakeHandler.PerformHandshake; +end; + +{ TWebsocketRecieverThread } + +procedure TWebsocketRecieverThread.DoExecute; +begin + FStopped := False; + while not Terminated and not FStopped and FCommunicator.Open do + begin + FCommunicator.RecieveMessage; + Sleep(10); + end; +end; + +procedure TWebsocketRecieverThread.Kill; +begin + FStopped := True; +end; + +{ THostHandler } + +constructor THostHandler.Create(const AHost: string; FreeObjects: boolean); +begin + FHost := AHost; + inherited Create(FreeObjects); +end; + +{ TWebsocketHandler } + +function TWebsocketHandler.Accept(const ARequest: TRequestData; + const ResponseHeaders: TStrings): boolean; +begin + Result := True; +end; + +procedure TWebsocketHandler.HandleCommunication( + ACommunicator: TWebsocketCommunincator); +begin + // No implementation; To be overriden +end; + +procedure TThreadedWebsocketHandler.HandleCommunication( + ACommunicator: TWebsocketCommunincator); +begin + CreateHandlerThread(ACommunicator, Self); +end; + +procedure TThreadedWebsocketHandler.DoHandleCommunication( + ACommunication: TWebsocketCommunincator); +begin + // No implementation; To be overriden +end; + +{ THostMap } + +constructor THostMap.Create; +begin + inherited Create(True); +end; + +procedure THostMap.AddHost(const AHost: THostHandler); +begin + Objects[AHost.FHost] := AHost; +end; + +{ TLockedHostMap } + +constructor TLockedHostMap.Create; +begin + inherited Create(THostMap.Create); +end; + +procedure TRequestHeaders.Parse(const HeaderString: string); +var + sl: TStringList; + s: string; + p: integer; +begin + sl := TStringList.Create; + try + sl.TextLineBreakStyle := tlbsCRLF; + sl.Text := HeaderString; + for s in sl do + begin + // Use sl.Values instead? + p := s.IndexOf(':'); + if p > 0 then + Self.KeyData[s.Substring(0, p).ToLower] := s.Substring(p + 1).Trim; + end; + finally + sl.Free; + end; +end; + +constructor TRequestHeaders.Create; +begin + inherited Create; + Self.OnKeyCompare := @DoHeaderKeyCompare; + // Binary search => faster access + Self.Sorted := True; +end; + +{ TWebsocketHandshakeHandler } + +function TWebsocketHandshakeHandler.ReadRequest(var RequestData: TRequestData): boolean; +var + method: string; + proto: string; + headerstr: string; + upg: string; + conn: string; + version: string; +begin + Result := False; + // Check if this is HTTP by checking the first line + // Method GET is required + SetLength(method, 4); + FStream.ReadBuffer(method[1], 4); + if method <> 'GET ' then + begin + // Not GET + Exit; + end; + // Read path and HTTP version + FStream.ReadTo(' ', RequestData.Path); + FStream.ReadTo(#13#10, proto, 10); + RequestData.Path := RequestData.Path.TrimRight; + proto := proto.TrimRight.ToLower; + if not proto.StartsWith('http/') then + begin + // Only accept http/1.1 + Exit; + end; + if not proto.EndsWith('1.1') then + begin + // non 1.1 version: return forbidden + Exit; + end; + // Headers are separated by 2 newlines (CR+LF) + FStream.ReadTo(#13#10#13#10, headerstr, 2048); + RequestData.Headers.Parse(headerstr.Trim); + if not (RequestData.Headers.TryGetData('Upgrade', upg) and + RequestData.Headers.TryGetData('Connection', conn) and + RequestData.Headers.TryGetData('Sec-WebSocket-Key', RequestData.Key) and + (upg = 'websocket') and (conn.Contains('Upgrade'))) then + begin + // Seems to be a normal HTTP request, we only handle websockets + Exit; + end; + // How to handle this? + if not RequestData.Headers.TryGetData('Sec-WebSocket-Version', version) then + version := ''; + if not RequestData.Headers.TryGetData('Host', RequestData.Host) then + RequestData.Host := ''; + Result := True; +end; + +function TWebsocketHandshakeHandler.GenerateAcceptingKey(const Key: string): string; +var + concatKey: string; + keyHash: TSHA1Digest; + OutputStream: TStringStream; + b64Encoder: TBase64EncodingStream; +const + WebsocketMagicString = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; +begin + // Key = Base64(SHA1(Key + MagicString)) + concatKey := Key + WebsocketMagicString; + keyHash := SHA1String(concatKey); + OutputStream := TStringStream.Create(''); + try + b64Encoder := TBase64EncodingStream.Create(OutputStream); + try + b64Encoder.WriteBuffer(keyHash[low(keyHash)], Length(keyHash)); + b64Encoder.Flush; + Result := OutputStream.DataString; + finally + b64Encoder.Free; + end; + finally + OutputStream.Free; + end; +end; + +procedure TWebsocketHandshakeHandler.PerformHandshake; +var + RequestData: TRequestData; + hm: THostMap; + hh: THostHandler; + sh: TWebsocketHandler; + ResponseHeaders: TStringList; + i: integer; + HandsakeResponse: TStringList; + Comm: TWebsocketCommunincator; +begin + try + RequestData.Headers := TRequestHeaders.Create; + try + // Reqding request + try + if not ReadRequest(RequestData) then + begin + FStream.WriteRaw(MalformedRequestMessage); + FStream.Free; + Exit; + end; + except + on E: EReadError do + begin + FStream.WriteRaw(MalformedRequestMessage); + FStream.Free; + Exit; + end; + end; + // Getting responsible handler + hm := FHostMap.Lock; + try + hh := hm.Objects[RequestData.Host]; + if not Assigned(hh) then + begin + FStream.WriteRaw(HandlerNotFoundMessage); + FStream.Free; + Exit; + end; + sh := hh.Objects[RequestData.Path]; + if not Assigned(sh) then + begin + FStream.WriteRaw(HandlerNotFoundMessage); + FStream.Free; + Exit; + end; + finally + FHostMap.Unlock; + end; + // Checking if handler wants to accept + ResponseHeaders := TStringList.Create; + try + ResponseHeaders.NameValueSeparator := ':'; + if not sh.Accept(RequestData, ResponseHeaders) then + begin + FStream.WriteRaw(ForbiddenRequestMessage); + FStream.Free; + Exit; + end; + // Neseccary headers + ResponseHeaders.Values['Connection'] := 'Upgrade'; + ResponseHeaders.Values['Upgrade'] := 'websocket'; + ResponseHeaders.Values['Sec-WebSocket-Accept'] := + GenerateAcceptingKey(RequestData.Key); + // Generating response + HandsakeResponse := TStringList.Create; + try + HandsakeResponse.TextLineBreakStyle := tlbsCRLF; + HandsakeResponse.Add('HTTP/1.1 101 Switching Protocols'); + for i := 0 to ResponseHeaders.Count - 1 do + HandsakeResponse.Add('%s: %s'.Format([ResponseHeaders.Names[i], + ResponseHeaders.ValueFromIndex[i]])); + HandsakeResponse.Add(''); + + FStream.WriteRaw(HandsakeResponse.Text); + finally + HandsakeResponse.Free; + end; + finally + ResponseHeaders.Free; + end; + finally + RequestData.Headers.Free; + end; + Comm := TWebsocketCommunincator.Create(TLockedSocketStream.Create(FStream), + False, True); + finally + // Not needed anymore, we can now die in piece. + // All information requier for the rest is now on the stack + Self.Free; + end; + sh.HandleCommunication(Comm); +end; + +constructor TWebsocketHandshakeHandler.Create(AStream: TSocketStream; + AHostMap: TLockedHostMap); +begin + FHostMap := AHostMap; + FStream := AStream; +end; + +{ TWebSocketServer } + +procedure TWebSocketServer.DoCreate; +begin + FSocket.OnConnect := @HandleConnect; + FHostMap := TLockedHostMap.Create; + FFreeHandlers := True; + FAcceptingMethod := samDefault; +end; + +procedure TWebSocketServer.HandleConnect(Sender: TObject; Data: TSocketStream); +var + HandshakeHandler: TWebsocketHandshakeHandler; + t: TAcceptingThread; +begin + HandshakeHandler := TWebsocketHandshakeHandler.Create(Data, FHostMap); + case AcceptingMethod of + samDefault: + HandshakeHandler.PerformHandshake; + samThreaded: + begin + t := TAcceptingThread.Create(True); + t.DoTerminate := True; + t.FreeOnTerminate := True; + t.HandshakeHandler := HandshakeHandler; + t.Restart; + end; + samThreadPool: + CreateAcceptingThread(HandshakeHandler); + end; +end; + +procedure TWebSocketServer.Start; +begin + FSocket.StartAccepting; +end; + +procedure TWebSocketServer.Stop(DoAbort: boolean); +begin + FSocket.StopAccepting(DoAbort); +end; + +procedure TWebSocketServer.RegisterHandler(const AHost: string; + const APath: string; AHandler: TWebsocketHandler; DefaultHost: boolean; + DefaultPath: boolean); +var + map: THostMap; + hh: THostHandler; +begin + map := FHostMap.Lock; + try + if not map.TryGetObject(AHost, hh) then + begin + hh := THostHandler.Create(AHost, FFreeHandlers); + map.AddHost(hh); + end; + if DefaultHost then + map.DefaultObject := hh; + hh[APath] := AHandler; + if DefaultPath then + hh.DefaultObject := AHandler; + finally + FHostMap.Unlock; + end; +end; + +destructor TWebSocketServer.Destroy; +begin + Stop(True); + FSocket.Free; + FHostMap.Free; + inherited Destroy; +end; + +constructor TWebSocketServer.Create(const AHost: string; const APort: word; + AHandler: TSocketHandler); +begin + FSocket := TInetServer.Create(AHost, APort, AHandler); + DoCreate; +end; + +constructor TWebSocketServer.Create(const APort: word); +begin + FSocket := TInetServer.Create(APort); + DoCreate; +end; + +initialization + AcceptingThreadPool := TLockedAcceptingThreadPool.Create(TAcceptingThreadPool.Create); + HandlerThreadPool := TLockedHandlerThreadPool.Create(THandlerThreadPool.Create); + RecieverThreadPool := TLockedRecieverThreadPool.Create(TRecieverThreadPool.Create); + +finalization + AcceptingThreadPool.Free; + RecieverThreadPool.Free; + HandlerThreadPool.Free; + +end. diff --git a/src/wsmessages.pas b/src/wsmessages.pas new file mode 100644 index 0000000..2c376f8 --- /dev/null +++ b/src/wsmessages.pas @@ -0,0 +1,97 @@ +unit wsmessages; + +{$mode objfpc}{$H+} + +interface + +uses + Classes, SysUtils, fgl, wsutils; + +type + // Represent opcodes + TWebsocketMessageType = (wmtContinue = 0, wmtString = 1, wmtBinary = + 2, wmtClose = 8, wmtPing = 9, wmtPong = 10); + + { TWebsocketMessage } + + TWebsocketMessage = class + private + FMessageType: TWebsocketMessageType; + public + constructor Create(const AMessageType: TWebsocketMessageType); + property MessageType: TWebsocketMessageType read FMessageType; + end; + + { TWebsocketStringMessage } + + TWebsocketStringMessage = class(TWebsocketMessage) + private + FData: UTF8String; + public + constructor Create(const AData: UTF8String); + property Data: UTF8String read FData; + end; + + { TWebsocketPongMessage } + + TWebsocketPongMessage = class(TWebsocketMessage) + private + FData: UTF8String; + public + constructor Create(const AData: UTF8String); + property Data: UTF8String read FData; + end; + + { TWebsocketBinaryMessage } + + TWebsocketBinaryMessage = class(TWebsocketMessage) + private + FData: TBytes; + public + constructor Create(const AData: TBytes); + property Data: TBytes read FData; + end; + + TWebsocketMessageList = class(specialize TFPGList<TWebsocketMessage>); + TWebsocketMessageOwnerList = class(specialize TFPGObjectList<TWebsocketMessage>); + TLockedWebsocketMessageList = class(specialize TThreadedObject<TWebsocketMessageList>); + +implementation + + +{ TWebsocketMessage } + +constructor TWebsocketMessage.Create(const AMessageType: TWebsocketMessageType); +begin + FMessageType := AMessageType; +end; + +{ TWebsocketStringMessage } + +constructor TWebsocketStringMessage.Create(const AData: UTF8String); +begin + inherited Create(wmtString); + FData := AData; + SetLength(FData, Length(FData)); +end; + +{ TWebsocketPongMessage } + +constructor TWebsocketPongMessage.Create(const AData: UTF8String); +begin + inherited Create(wmtPong); + FData := AData; + SetLength(FData, Length(FData)); +end; + +{ TWebsocketBinaryMessage } + +constructor TWebsocketBinaryMessage.Create(const AData: TBytes); +begin + inherited Create(wmtBinary); + FData := AData; + SetLength(FData, Length(FData)); +end; + +end. + diff --git a/src/wsstream.pas b/src/wsstream.pas new file mode 100644 index 0000000..f8320ed --- /dev/null +++ b/src/wsstream.pas @@ -0,0 +1,654 @@ +unit wsstream; + +{$mode objfpc}{$H+} + +interface + +uses + Classes, SysUtils, ssockets, wsmessages, Sockets; + +type + + { EWebsocketError } + + EWebsocketError = class(Exception) + private + FCode: integer; + public + constructor Create(const msg: string; ACode: integer); + property Code: integer read FCode; + end; + + EWebsocketWriteError = class(EWebsocketError); + + EWebsocketReadError = class(EWebsocketError); + + TNetAddress = record + Address: string; + Port: integer; + end; + + { TLockedSocketStream } + + TLockedSocketStream = class + private + FLocalAddress: TNetAddress; + FRemoteAddress: TNetAddress; + FStream: TSocketStream; + FLock: TRTLCriticalSection; + function isOpen: boolean; + public + constructor Create(const AStream: TSocketStream); + destructor Destroy; override; + + function Lock: TSocketStream; + procedure Unlock; + procedure CloseStream; + property Open: boolean read isOpen; + property RemoteAddress: TNetAddress read FRemoteAddress; + property LocalAddress: TNetAddress read FLocalAddress; + end; + + { TWebsocketMessageStream } + + TWebsocketMessageStream = class(TStream) + private + FDataStream: TLockedSocketStream; + FMaxFrameSize: int64; + FMessageType: TWebsocketMessageType; + FBuffer: TBytes; + FCurrentLen: int64; + FFirstWrite: boolean; + FMaskKey: integer; + + procedure WriteDataFrame(Finished: boolean = False); + public + constructor Create(const ADataStream: TLockedSocketStream; + AMessageType: TWebsocketMessageType = wmtString; + AMaxFrameLen: int64 = 125; AMaskKey: integer = -1); + destructor Destroy; override; + function Seek(Offset: longint; Origin: word): longint; override; + function Read(var Buffer; Count: longint): longint; override; + function Write(const Buffer; Count: longint): longint; override; + end; + + { TWebsocketCommunincator } + + TWebsocketCommunincator = class + private + FStream: TLockedSocketStream; + FMessages: TLockedWebsocketMessageList; + FMaskMessages: boolean; + FAssumeMaskedMessages: boolean; + FOnRecieveMessage: TNotifyEvent; + FOnClose: TNotifyEvent; + FExpectClose: boolean; + function GenerateMask: integer; + function GetOpen: boolean; + public + constructor Create(AStream: TLockedSocketStream; AMaskMessage: boolean; + AssumeMaskedMessages: boolean); + destructor Destroy; override; + + procedure Close(ForceClose: boolean = False); + + procedure RecieveMessage; + function GetUnprocessedMessages(const MsgList: TWebsocketMessageOwnerList): integer; + + function WriteMessage(MessageType: TWebsocketMessageType = wmtString; + MaxFrameLength: int64 = 125): TWebsocketMessageStream; + + property OnRecieveMessage: TNotifyEvent read FOnRecieveMessage + write FOnRecieveMessage; + property OnClose: TNotifyEvent read FOnClose write FOnClose; + property SocketStream: TLockedSocketStream read FStream; + property Open: boolean read GetOpen; + end; + +implementation +{*------------------------------------------------------------------------------ + * extension of htons and htonl for qwords (ll: long long from C) + *-----------------------------------------------------------------------------} +function htonll(host: QWord): QWord; inline; +begin +{$ifdef FPC_BIG_ENDIAN} + Result := host; +{$else} + Result := SwapEndian(host); +{$endif} +end; + +function ntohll(net: QWord): QWord; inline; +begin +{$ifdef FPC_BIG_ENDIAN} + Result := net; +{$else} + Result := SwapEndian(net); +{$endif} +end; + + +type + { Protocol specific types } + TWebsocketFrameHeader = record + Fin: boolean; + OPCode: TWebsocketMessageType; + Mask: boolean; + PayloadLen: byte; + end; + TMaskRec = record + case boolean of + True: (Bytes: array[0..3] of byte); + False: (Key: integer); + end; + TWordRec = record + case boolean of + True: (Bytes: array[0..1] of byte); + False: (Value: word); + end; + +function WordToFrameHeader(Data: word): TWebsocketFrameHeader; inline; +var + wordRec: TWordRec; +begin + wordRec.Value := Data; + Result.Fin := (wordRec.Bytes[0] and 128) = 128; + Result.OPCode := TWebsocketMessageType(wordRec.Bytes[0] and %1111); + Result.Mask := (wordRec.Bytes[1] and 128) = 128; + Result.PayloadLen := wordRec.Bytes[1] and %1111111; +end; + +function boolToBit(b: boolean; Bit: byte): byte; inline; +begin + Result := 0; + if b then + Result := 1 shl Bit; +end; + +function FrameHEaderToWord(const Header: TWebsocketFrameHeader): word; inline; +var + wordRec: TWordRec; +begin + wordRec.Bytes[0] := boolToBit(Header.Fin, 7) or (Ord(Header.OPCode) and %1111); + wordRec.Bytes[1] := boolToBit(Header.Mask, 7) or (Header.PayloadLen and %1111111); + Result := wordRec.Value; +end; + +{ EWebsocketError } + +constructor EWebsocketError.Create(const msg: string; ACode: integer); +begin + inherited Create(msg); + FCode := ACode; +end; + +{ TLockedSocketStream } + +function TLockedSocketStream.isOpen: boolean; +begin + Lock; + try + Result := Assigned(FStream); + finally + Unlock; + end; +end; + +constructor TLockedSocketStream.Create(const AStream: TSocketStream); +begin + FLocalAddress.Address := NetAddrToStr(AStream.LocalAddress.sin_addr); + FLocalAddress.Port := AStream.LocalAddress.sin_port; + FRemoteAddress.Address := NetAddrToStr(AStream.RemoteAddress.sin_addr); + FRemoteAddress.Port := AStream.LocalAddress.sin_port; + FStream := AStream; + InitCriticalSection(FLock); +end; + +destructor TLockedSocketStream.Destroy; +begin + CloseStream; + DoneCriticalsection(FLock); + inherited Destroy; +end; + +function TLockedSocketStream.Lock: TSocketStream; +begin + EnterCriticalsection(FLock); + Result := FStream; +end; + +procedure TLockedSocketStream.Unlock; +begin + LeaveCriticalsection(FLock); +end; + +procedure TLockedSocketStream.CloseStream; +begin + Lock; + try + FreeAndNil(FStream); + finally + Unlock; + end; +end; + +{ TWebsocketCommunincator } + +function TWebsocketCommunincator.GenerateMask: integer; +begin + Result := -1; + if FMaskMessages then // Not really secure... + Result := integer(Random(DWord.MaxValue)); +end; + +function TWebsocketCommunincator.GetOpen: boolean; +begin + Result := FStream.Open; +end; + +constructor TWebsocketCommunincator.Create(AStream: TLockedSocketStream; + AMaskMessage: boolean; AssumeMaskedMessages: boolean); +begin + FStream := AStream; + FMaskMessages := AMaskMessage; + FAssumeMaskedMessages := AssumeMaskedMessages; + FMessages := TLockedWebsocketMessageList.Create(TWebsocketMessageList.Create); + FExpectClose := False; +end; + +destructor TWebsocketCommunincator.Destroy; +begin + // Ending communication => Close stream + Close(True); + FStream.Free; + FMessages.Free; + inherited Destroy; +end; + +procedure TWebsocketCommunincator.Close(ForceClose: boolean); +begin + if not Open then + Exit; + if not ForceClose then + begin + WriteMessage(wmtClose).Free; + FExpectClose := True; + Exit; + end; + if Assigned(FOnClose) then + FOnClose(Self); + FStream.CloseStream; +end; + +procedure TWebsocketCommunincator.RecieveMessage; + + procedure ReadData(var buffer; const len: int64); + var + ToRead: longint; + Read: longint; + LeftToRead: int64; + TotalRead: int64; + oldTO: integer; + Stream: TSocketStream; + const + IOTimeoutError = 11; + WaitingTime = 10; + begin + TotalRead := 0; + repeat + // how much we are trying to read at a time + LeftToRead := len - TotalRead; + if LeftToRead > ToRead.MaxValue then + ToRead := ToRead.MaxValue + else + ToRead := LeftToRead; + // Reading + + Stream := FStream.Lock; + try + if not Assigned(Stream) then + begin + raise EWebsocketReadError.Create('Socket already closed', 0); + end; + oldTO := Stream.IOTimeout; + Stream.IOTimeout := 1; + try + Read := Stream.Read(PByte(@buffer)[TotalRead], ToRead); + if Read < 0 then + begin + // on Error + if Stream.LastError <> IOTimeoutError then + raise EWebsocketReadError.Create('error reading from stream', + Stream.LastError); + end + else + begin + // Increase the amount to read + TotalRead += Read; + end; + finally + Stream.IOTimeout := oldTO; + end; + finally + FStream.Unlock; + end; + if (TotalRead < len) and (Read <> ToRead) then // not finished, wait for some data + Sleep(WaitingTime); + until TotalRead >= len; + end; + + procedure AddMessageToList(Message: TWebsocketMessage); + var + lst: TWebsocketMessageList; + begin + if Assigned(Message) then + begin + lst := FMessages.Lock; + try + lst.Add(Message); + finally + FMessages.Unlock; + end; + if Assigned(FOnRecieveMessage) then + begin + FOnRecieveMessage(Self); + end; + end; + end; + + function ProcessSpecialMessages(messageType: TWebsocketMessageType; + var buffer; const buffLen: int64): boolean; + var + str: UTF8String; + begin + Result := True; + case messageType of + wmtClose: + begin + // If we didn't send the original close, return the message + if not FExpectClose then + WriteMessage(wmtClose).Free; + // Close the stream (true to not send a message + Close(True); + end; + wmtPing: + begin + // On ping send pong, with same content + with WriteMessage(wmtPong) do + try + if buffLen > 0 then + Write(PByte(@buffer)[0], buffLen); + finally + Free; + end; + end; + wmtPong: + begin + // lift pong message to message queue, so user can handle it + SetLength(str, buffLen); + if buffLen > 0 then + Move(buffer, str[1], buffLen); + AddMessageToList(TWebsocketPongMessage.Create(str)); + end + else + Result := False; + end; + end; + +var + Header: TWebsocketFrameHeader; + len64: int64; + len16: word; + len: int64; + MaskRec: TMaskRec; + buffer: TBytes; + i: int64; + Message: TWebsocketMessage; + outputStream: TMemoryStream; + messageType: TWebsocketMessageType; + msgType: TWebsocketMessageType; + str: UTF8String; + w: word; +begin + Message := nil; + outputStream := TMemoryStream.Create; + msgType:=wmtContinue; + try + try + repeat + if not Open then + Exit; + ReadData(w, 2); + Header := WordToFrameHeader(w); + if Header.OPCode <> wmtContinue then + messageType := TWebsocketMessageType(Header.OPCode); + if Header.PayloadLen < 126 then + len := Header.PayloadLen + else if Header.PayloadLen = 126 then + begin + ReadData(len16, SizeOf(len16)); + len := NToHs(len16); + end + else + begin + ReadData(len64, SizeOf(len64)); + len := ntohll(len64); + end; + if Header.Mask then + begin + ReadData(MaskRec.Key, SizeOf(MaskRec.Key)); + end + else if FAssumeMaskedMessages then + begin + Close(True); + Exit; + end; + // Read payload + SetLength(buffer, len); + if len > 0 then + begin + ReadData(buffer[0], len); + if Header.Mask then + begin + // As this is 64 bit, to be 32 bit compatible we can't use a for loop + i := 0; + while i < len do + begin + buffer[i] := buffer[i] xor MaskRec.Bytes[i mod 4]; + Inc(i); + end; + end; + end; + // Handling special messages + if ProcessSpecialMessages(messageType, PByte(buffer)^, len) then + begin + // am i in the middle of a communication? + // If so dont use the fin in the end + if msgType <> wmtContinue then Continue; + end + else + begin + if messageType <> wmtContinue then + msgType:=messageType; + // This is a dataframe, so save data for concatination of fragments + if len > 0 then + outputStream.WriteBuffer(buffer[0], len); + end; + until Header.Fin; + // Read whole message + outputStream.Seek(0, soBeginning); + case msgType of + wmtString: + begin + SetLength(str, outputStream.Size); + outputStream.ReadBuffer(str[1], outputStream.Size); + Message := TWebsocketStringMessage.Create(str); + end; + wmtBinary: + begin + SetLength(buffer, outputStream.Size); + outputStream.ReadBuffer(buffer[0], outputStream.Size); + Message := TWebsocketBinaryMessage.Create(buffer); + end; + end; + AddMessageToList(Message); + finally + outputStream.Free; + end; + except + On e: EWebsocketReadError do + begin + if e.Code = 0 then + begin + // Stream has been closed + Close(True); + end; + end; + end; +end; + +function TWebsocketCommunincator.WriteMessage(MessageType: TWebsocketMessageType; + MaxFrameLength: int64): TWebsocketMessageStream; +begin + Result := TWebsocketMessageStream.Create(FStream, MessageType, + MaxFrameLength, generateMask); +end; + +function TWebsocketCommunincator.GetUnprocessedMessages( + const MsgList: TWebsocketMessageOwnerList): integer; +var + lst: TWebsocketMessageList; + m: TWebsocketMessage; +begin + lst := FMessages.Lock; + try + Result := lst.Count; + for m in lst do + MsgList.Add(m); + lst.Clear; + finally + FMessages.Unlock; + end; +end; + +{ TWebsocketMessageStream } + +procedure TWebsocketMessageStream.WriteDataFrame(Finished: boolean); +var + Header: TWebsocketFrameHeader; + i: int64; + MaskRec: TMaskRec; + Stream: TSocketStream; +begin + Stream := FDataStream.Lock; + try + if not Assigned(Stream) then + begin + raise EWebsocketWriteError.Create('Stream already closed', 0); + end; + try + Header.Fin := Finished; + Header.Mask := (FMaskKey <> -1); + if FFirstWrite then + Header.OPCode := FMessageType + else + Header.OPCode := wmtContinue; + // Compute size + if FCurrentLen < 126 then + Header.PayloadLen := FCurrentLen + else if FCurrentLen <= word.MaxValue then + Header.PayloadLen := 126 + else + Header.PayloadLen := 127; + // Write header + Stream.WriteWord(FrameHEaderToWord(Header)); + // Write size if it exceeds 125 + if (FCurrentLen > 125) then + begin + if (FCurrentLen <= word.MaxValue) then + Stream.WriteWord(htons(word(FCurrentLen))) + else + Stream.WriteQWord(htonll(QWord(FCurrentLen))); + end; + if Header.Mask then + begin + // If we use a mask + MaskRec.Key := FMaskKey; + // First: Transmit mask Key + Stream.WriteBuffer(MaskRec.Bytes[0], 4); + // 2. Encode Message + // As this is 64 bit, to be 32 bit compatible we can't use a for loop + i := 0; + while i < FCurrentLen do + begin + FBuffer[i] := FBuffer[i] xor MaskRec.Bytes[i mod 4]; + Inc(i); + end; + end; + // Write Message payload + Stream.WriteBuffer(FBuffer[0], FCurrentLen); + // Reset state for next data + FCurrentLen := 0; + except + on E: EWriteError do + raise EWebsocketWriteError.Create(e.Message, Stream.LastError); + end; + finally + FDataStream.Unlock; + end; +end; + +constructor TWebsocketMessageStream.Create(const ADataStream: TLockedSocketStream; + AMessageType: TWebsocketMessageType; AMaxFrameLen: int64; AMaskKey: integer); +begin + FDataStream := ADataStream; + FMaxFrameSize := AMaxFrameLen; + FMessageType := AMessageType; + SetLength(FBuffer, AMaxFrameLen); + FCurrentLen := 0; + FFirstWrite := True; + FMaskKey := AMaskKey; +end; + +destructor TWebsocketMessageStream.Destroy; +begin + WriteDataFrame(True); + inherited Destroy; +end; + +function TWebsocketMessageStream.Seek(Offset: longint; Origin: word): longint; +begin + // We cant seek + Result := 0; +end; + +function TWebsocketMessageStream.Read(var Buffer; Count: longint): longint; +begin + // Write only stream + Result := 0; +end; + +function TWebsocketMessageStream.Write(const Buffer; Count: longint): longint; +var + ToWrite: integer; +begin + while FCurrentLen + Count > FMaxFrameSize do + begin + // Doesn't fit into one dataframe + // So we split it up into multiple + ToWrite := FMaxFrameSize - FCurrentLen; + Move(Buffer, FBuffer[FCurrentLen], ToWrite); + FCurrentLen := FMaxFrameSize; + WriteDataFrame(False); + // Now FCurrentLen should be 0 again + // Only decrese the count + Dec(Count, ToWrite); + end; + Move(Buffer, FBuffer[FCurrentLen], Count); + FCurrentLen += Count; + Result := Count; +end; + + +end. + diff --git a/utilities.pas b/src/wsutils.pas similarity index 95% rename from utilities.pas rename to src/wsutils.pas index f03aae3..f122158 100644 --- a/utilities.pas +++ b/src/wsutils.pas @@ -1,4 +1,4 @@ -unit utilities; +unit wsutils; {$mode objfpc}{$H+} diff --git a/websocket.pas b/websocket.pas deleted file mode 100644 index d731f41..0000000 --- a/websocket.pas +++ /dev/null @@ -1,1357 +0,0 @@ -unit WebSocket; - -{$mode objfpc}{$H+} - -interface - -uses - Classes, SysUtils, ssockets, fgl, sha1, base64, utilities, Sockets; - -type - - { EWebsocketError } - - EWebsocketError = class(Exception) - private - FCode: integer; - public - constructor Create(const msg: string; ACode: integer); - property Code: integer read FCode; - end; - - EWebsocketWriteError = class(EWebsocketError); - - EWebsocketReadError = class(EWebsocketError); - - { TRequestHeaders } - - TRequestHeaders = class(specialize TFPGMap<string, string>) - public - procedure Parse(const HeaderString: string); - constructor Create; - end; - - TRequestData = record - Host: string; - Path: string; - Key: string; - Headers: TRequestHeaders; - end; - - // Represent opcodes - TWebsocketMessageType = (wmtContinue = 0, wmtString = 1, wmtBinary = - 2, wmtClose = 8, wmtPing = 9, wmtPong = 10); - - { TWebsocketMessage } - - TWebsocketMessage = class - private - FMessageType: TWebsocketMessageType; - public - constructor Create(const AMessageType: TWebsocketMessageType); - property MessageType: TWebsocketMessageType read FMessageType; - end; - - { TWebsocketStringMessage } - - TWebsocketStringMessage = class(TWebsocketMessage) - private - FData: UTF8String; - public - constructor Create(const AData: UTF8String); - property Data: UTF8String read FData; - end; - - { TWebsocketStringMessage } - - { TWebsocketPongMessage } - - TWebsocketPongMessage = class(TWebsocketMessage) - private - FData: UTF8String; - public - constructor Create(const AData: UTF8String); - property Data: UTF8String read FData; - end; - - { TWebsocketBinaryMessage } - - TWebsocketBinaryMessage = class(TWebsocketMessage) - private - FData: TBytes; - public - constructor Create(const AData: TBytes); - property Data: TBytes read FData; - end; - - TWebsocketMessageList = class(specialize TFPGList<TWebsocketMessage>); - TWebsocketMessageOwnerList = class(specialize TFPGObjectList<TWebsocketMessage>); - TLockedWebsocketMessageList = class(specialize TThreadedObject<TWebsocketMessageList>); - - TNetAddress = record - Address: string; - Port: integer; - end; - - { TLockedSocketStream } - - TLockedSocketStream = class - private - FLocalAddress: TNetAddress; - FRemoteAddress: TNetAddress; - FStream: TSocketStream; - FLock: TRTLCriticalSection; - function isOpen: boolean; - public - constructor Create(const AStream: TSocketStream); - destructor Destroy; override; - - function Lock: TSocketStream; - procedure Unlock; - procedure CloseStream; - property Open: boolean read isOpen; - property RemoteAddress: TNetAddress read FRemoteAddress; - property LocalAddress: TNetAddress read FLocalAddress; - end; - - { TWebsocketMessageStream } - - TWebsocketMessageStream = class(TStream) - private - FDataStream: TLockedSocketStream; - FMaxFrameSize: int64; - FMessageType: TWebsocketMessageType; - FBuffer: TBytes; - FCurrentLen: int64; - FFirstWrite: boolean; - FMaskKey: integer; - - procedure WriteDataFrame(Finished: boolean = False); - public - constructor Create(const ADataStream: TLockedSocketStream; - AMessageType: TWebsocketMessageType = wmtString; - AMaxFrameLen: int64 = 125; AMaskKey: integer = -1); - destructor Destroy; override; - function Seek(Offset: longint; Origin: word): longint; override; - function Read(var Buffer; Count: longint): longint; override; - function Write(const Buffer; Count: longint): longint; override; - end; - - { TWebsocketCommunincator } - - TWebsocketCommunincator = class - private - FStream: TLockedSocketStream; - FMessages: TLockedWebsocketMessageList; - FMaskMessages: boolean; - FAssumeMaskedMessages: boolean; - FOnRecieveMessage: TNotifyEvent; - FOnClose: TNotifyEvent; - FExpectClose: boolean; - function GenerateMask: integer; - function GetOpen: boolean; - public - constructor Create(AStream: TLockedSocketStream; AMaskMessage: boolean; - AssumeMaskedMessages: boolean); - destructor Destroy; override; - - procedure Close(ForceClose: boolean = False); - - procedure RecieveMessage; - function GetUnprocessedMessages(const MsgList: TWebsocketMessageOwnerList): integer; - - function WriteMessage(MessageType: TWebsocketMessageType = wmtString; - MaxFrameLength: int64 = 125): TWebsocketMessageStream; - - property OnRecieveMessage: TNotifyEvent read FOnRecieveMessage - write FOnRecieveMessage; - property OnClose: TNotifyEvent read FOnClose write FOnClose; - property SocketStream: TLockedSocketStream read FStream; - property Open: boolean read GetOpen; - end; - - { TWebsocketHandler } - - TWebsocketHandler = class - public - function Accept(const ARequest: TRequestData; - const ResponseHeaders: TStrings): boolean; virtual; - procedure HandleCommunication(ACommunicator: TWebsocketCommunincator); virtual; - end; - - TThreadedWebsocketHandler = class(TWebsocketHandler) - public - procedure HandleCommunication(ACommunicator: TWebsocketCommunincator); override; - procedure DoHandleCommunication(ACommunication: TWebsocketCommunincator); virtual; - end; - - - { THostHandler } - - THostHandler = class(specialize TStringObjectMap<TWebsocketHandler>) - private - FHost: string; - public - constructor Create(const AHost: string; FreeObjects: boolean); - property Host: string read FHost; - end; - - { THostMap } - - THostMap = class(specialize TStringObjectMap<THostHandler>) - public - constructor Create; - procedure AddHost(const AHost: THostHandler); - end; - - { TLockedHostMap } - - TLockedHostMap = class(specialize TThreadedObject<THostMap>) - public - constructor Create; - end; - - TServerAcceptingMethod = (samDefault, samThreaded, samThreadPool); - - { TWebSocketServer } - - TWebSocketServer = class - private - FSocket: TInetServer; - FHostMap: TLockedHostMap; - FFreeHandlers: boolean; - FAcceptingMethod: TServerAcceptingMethod; - - procedure DoCreate; - procedure HandleConnect(Sender: TObject; Data: TSocketStream); - public - procedure Start; - procedure Stop(DoAbort: boolean = False); - - procedure RegisterHandler(const AHost: string; const APath: string; - AHandler: TWebsocketHandler; DefaultHost: boolean = False; - DefaultPath: boolean = False); - - destructor Destroy; override; - constructor Create(const AHost: string; const APort: word; - AHandler: TSocketHandler); - constructor Create(const APort: word); - property Socket: TInetServer read FSocket; - property FreeHandlers: boolean read FFreeHandlers write FFreeHandlers; - property AcceptingMethod: TServerAcceptingMethod - read FAcceptingMethod write FAcceptingMethod; - end; - -const - MalformedRequestMessage = - 'HTTP/1.1 400 Bad Request'#13#10#13#10'Not a Websocket Request'; - ForbiddenRequestMessage = - 'HTTP/1.1 403 Forbidden'#13#10#13#10'Request not accepted by Handler'; - HandlerNotFoundMessage = 'HTTP/1.1 404 Not Found'#13#10#13#10'No Handler registered for this request'; - - -implementation - -type - { Protocol specific types } - TWebsocketFrameHeader = record - Fin: boolean; - OPCode: TWebsocketMessageType; - Mask: boolean; - PayloadLen: byte; - end; - TMaskRec = record - case boolean of - True: (Bytes: array[0..3] of byte); - False: (Key: integer); - end; - TWordRec = record - case boolean of - True: (Bytes: array[0..1] of byte); - False: (Value: word); - end; - -function WordToFrameHeader(Data: word): TWebsocketFrameHeader; inline; -var - wordRec: TWordRec; -begin - wordRec.Value := Data; - Result.Fin := (wordRec.Bytes[0] and 128) = 128; - Result.OPCode := TWebsocketMessageType(wordRec.Bytes[0] and %1111); - Result.Mask := (wordRec.Bytes[1] and 128) = 128; - Result.PayloadLen := wordRec.Bytes[1] and %1111111; -end; - -function boolToBit(b: boolean; Bit: byte): byte; inline; -begin - Result := 0; - if b then - Result := 1 shl Bit; -end; - -function FrameHEaderToWord(const Header: TWebsocketFrameHeader): word; inline; -var - wordRec: TWordRec; -begin - wordRec.Bytes[0] := boolToBit(Header.Fin, 7) or (Ord(Header.OPCode) and %1111); - wordRec.Bytes[1] := boolToBit(Header.Mask, 7) or (Header.PayloadLen and %1111111); - Result := wordRec.Value; -end; - -type - - {Thread Types} - { TWebsocketHandlerThread } - - TWebsocketHandlerThread = class(TPoolableThread) - private - FCommunicator: TWebsocketCommunincator; - FHandler: TThreadedWebsocketHandler; - protected - procedure DoExecute; override; - property Handler: TThreadedWebsocketHandler read FHandler write FHandler; - property Communicator: TWebsocketCommunincator - read FCommunicator write FCommunicator; - end; - - THandlerThreadFactory = specialize TPoolableThreadFactory<TWebsocketHandlerThread>; - THandlerThreadPool = specialize TObjectPool<TWebsocketHandlerThread, - THandlerThreadFactory, THandlerThreadFactory>; - TLockedHandlerThreadPool = specialize TThreadedObject<THandlerThreadPool>; - - { TWebsocketRecieverThread } - - TWebsocketRecieverThread = class(TPoolableThread) - private - FCommunicator: TWebsocketCommunincator; - FStopped: boolean; - protected - procedure DoExecute; override; - procedure Kill; - property Communicator: TWebsocketCommunincator - read FCommunicator write FCommunicator; - end; - - TRecieverThreadFactory = specialize TPoolableThreadFactory<TWebsocketRecieverThread>; - TRecieverThreadPool = specialize TObjectPool<TWebsocketRecieverThread, - TRecieverThreadFactory, TRecieverThreadFactory>; - TLockedRecieverThreadPool = specialize TThreadedObject<TRecieverThreadPool>; - - { TWebsocketHandshakeHandler } - - TWebsocketHandshakeHandler = class - private - FStream: TSocketStream; - FHostMap: TLockedHostMap; - function ReadRequest(var RequestData: TRequestData): boolean; - function GenerateAcceptingKey(const Key: string): string; - public - procedure PerformHandshake; - constructor Create(AStream: TSocketStream; AHostMap: TLockedHostMap); - end; - - { TAcceptingThread } - - TAcceptingThread = class(TPoolableThread) - private - FHandshakeHandler: TWebsocketHandshakeHandler; - protected - procedure DoExecute; override; - - property HandshakeHandler: TWebsocketHandshakeHandler - read FHandshakeHandler write FHandshakeHandler; - end; - - TAcceptingThreadFactory = specialize TPoolableThreadFactory<TAcceptingThread>; - TAcceptingThreadPool = specialize TObjectPool<TAcceptingThread, - TAcceptingThreadFactory, TAcceptingThreadFactory>; - TLockedAcceptingThreadPool = specialize TThreadedObject<TAcceptingThreadPool>; - -var - RecieverThreadPool: TLockedRecieverThreadPool; - HandlerThreadPool: TLockedHandlerThreadPool; - AcceptingThreadPool: TLockedAcceptingThreadPool; - -function CreateAcceptingThread( - const AHandshakeHandler: TWebsocketHandshakeHandler): TAcceptingThread; inline; -var - pool: TAcceptingThreadPool; -begin - pool := AcceptingThreadPool.Lock; - try - Result := pool.GetObject; - Result.HandshakeHandler := AHandshakeHandler; - Result.Restart; - finally - AcceptingThreadPool.Unlock; - end; -end; - -function CreateHandlerThread(const ACommunicator: TWebsocketCommunincator; - const AHandler: TThreadedWebsocketHandler): TWebsocketHandlerThread; inline; -var - pool: THandlerThreadPool; -begin - pool := HandlerThreadPool.Lock; - try - Result := pool.GetObject; - Result.Communicator := ACommunicator; - Result.Handler := AHandler; - Result.Restart; - finally - HandlerThreadPool.Unlock; - end; -end; - -function CreateRecieverThread(const ACommunicator: TWebsocketCommunincator): -TWebsocketRecieverThread; inline; -var - pool: TRecieverThreadPool; -begin - pool := RecieverThreadPool.Lock; - try - Result := pool.GetObject; - Result.Communicator := ACommunicator; - Result.Restart; - finally - RecieverThreadPool.Unlock; - end; -end; - -{*------------------------------------------------------------------------------ - * extension of htons and htonl for qwords (ll: long long from C) - *-----------------------------------------------------------------------------} -function htonll(host: QWord): QWord; inline; -begin -{$ifdef FPC_BIG_ENDIAN} - Result := host; -{$else} - Result := SwapEndian(host); -{$endif} -end; - -function ntohll(net: QWord): QWord; inline; -begin -{$ifdef FPC_BIG_ENDIAN} - Result := net; -{$else} - Result := SwapEndian(net); -{$endif} -end; - -{ TRequestHeaders } - -function DoHeaderKeyCompare(const Key1, Key2: string): integer; -begin - // Headers are case insensetive - Result := CompareStr(Key1.ToLower, Key2.ToLower); -end; - -{ EWebsocketError } - -constructor EWebsocketError.Create(const msg: string; ACode: integer); -begin - inherited Create(msg); - FCode := ACode; -end; - -{ TLockedSocketStream } - -function TLockedSocketStream.isOpen: boolean; -begin - Lock; - try - Result := Assigned(FStream); - finally - Unlock; - end; -end; - -constructor TLockedSocketStream.Create(const AStream: TSocketStream); -begin - FLocalAddress.Address := NetAddrToStr(AStream.LocalAddress.sin_addr); - FLocalAddress.Port := AStream.LocalAddress.sin_port; - FRemoteAddress.Address := NetAddrToStr(AStream.RemoteAddress.sin_addr); - FRemoteAddress.Port := AStream.LocalAddress.sin_port; - FStream := AStream; - InitCriticalSection(FLock); -end; - -destructor TLockedSocketStream.Destroy; -begin - CloseStream; - DoneCriticalsection(FLock); - inherited Destroy; -end; - -function TLockedSocketStream.Lock: TSocketStream; -begin - EnterCriticalsection(FLock); - Result := FStream; -end; - -procedure TLockedSocketStream.Unlock; -begin - LeaveCriticalsection(FLock); -end; - -procedure TLockedSocketStream.CloseStream; -begin - Lock; - try - FreeAndNil(FStream); - finally - Unlock; - end; -end; - -procedure TAcceptingThread.DoExecute; -begin - FHandshakeHandler.PerformHandshake; -end; - -{ TWebsocketHandlerThread } - -procedure TWebsocketHandlerThread.DoExecute; -var - Recv: TWebsocketRecieverThread; -begin - Recv := CreateRecieverThread(FCommunicator); - try - try - FHandler.DoHandleCommunication(FCommunicator); - finally - FCommunicator.Close; - FCommunicator.Free; - end; - finally - Recv.Kill; - end; -end; - -{ TWebsocketRecieverThread } - -procedure TWebsocketRecieverThread.DoExecute; -begin - FStopped := False; - while not Terminated and not FStopped and FCommunicator.Open do - begin - FCommunicator.RecieveMessage; - Sleep(1000); - Yield; - end; -end; - -procedure TWebsocketRecieverThread.Kill; -begin - FStopped := True; -end; - -{ TWebsocketCommunincator } - -function TWebsocketCommunincator.GenerateMask: integer; -begin - Result := -1; - if FMaskMessages then // Not really secure... - Result := integer(Random(DWord.MaxValue)); -end; - -function TWebsocketCommunincator.GetOpen: boolean; -begin - Result := FStream.Open; -end; - -constructor TWebsocketCommunincator.Create(AStream: TLockedSocketStream; - AMaskMessage: boolean; AssumeMaskedMessages: boolean); -begin - FStream := AStream; - FMaskMessages := AMaskMessage; - FAssumeMaskedMessages := AssumeMaskedMessages; - FMessages := TLockedWebsocketMessageList.Create(TWebsocketMessageList.Create); - FExpectClose := False; -end; - -destructor TWebsocketCommunincator.Destroy; -begin - // Ending communication => Close stream - Close(True); - FStream.Free; - FMessages.Free; - inherited Destroy; -end; - -procedure TWebsocketCommunincator.Close(ForceClose: boolean); -begin - if not Open then - Exit; - if not ForceClose then - begin - WriteMessage(wmtClose).Free; - FExpectClose := True; - Exit; - end; - if Assigned(FOnClose) then - FOnClose(Self); - FStream.CloseStream; -end; - -procedure TWebsocketCommunincator.RecieveMessage; - - procedure ReadData(var buffer; const len: int64); - var - ToRead: longint; - Read: longint; - LeftToRead: int64; - TotalRead: int64; - oldTO: integer; - Stream: TSocketStream; - const - IOTimeoutError = 11; - WaitingTime = 10; - begin - TotalRead := 0; - repeat - // how much we are trying to read at a time - LeftToRead := len - TotalRead; - if LeftToRead > ToRead.MaxValue then - ToRead := ToRead.MaxValue - else - ToRead := LeftToRead; - // Reading - - Stream := FStream.Lock; - try - if not Assigned(Stream) then - begin - raise EWebsocketReadError.Create('Socket already closed', 0); - end; - oldTO := Stream.IOTimeout; - Stream.IOTimeout := 1; - try - Read := Stream.Read(PByte(@buffer)[TotalRead], ToRead); - if Read < 0 then - begin - // on Error - if Stream.LastError <> IOTimeoutError then - raise EWebsocketReadError.Create('error reading from stream', - Stream.LastError); - end - else - begin - // Increase the amount to read - TotalRead += Read; - end; - finally - Stream.IOTimeout := oldTO; - end; - finally - FStream.Unlock; - end; - if (TotalRead < len) and (Read <> ToRead) then // not finished, wait for some data - Sleep(WaitingTime); - until TotalRead >= len; - end; - - procedure AddMessageToList(Message: TWebsocketMessage); - var - lst: TWebsocketMessageList; - begin - if Assigned(Message) then - begin - lst := FMessages.Lock; - try - lst.Add(Message); - finally - FMessages.Unlock; - end; - if Assigned(FOnRecieveMessage) then - begin - FOnRecieveMessage(Self); - end; - end; - end; - - function ProcessSpecialMessages(messageType: TWebsocketMessageType; - var buffer; const buffLen: int64): boolean; - var - str: UTF8String; - begin - Result := True; - case messageType of - wmtClose: - begin - // If we didn't send the original close, return the message - if not FExpectClose then - WriteMessage(wmtClose).Free; - // Close the stream (true to not send a message - Close(True); - end; - wmtPing: - begin - // On ping send pong, with same content - with WriteMessage(wmtPong) do - try - if buffLen > 0 then - Write(PByte(@buffer)[0], buffLen); - finally - Free; - end; - end; - wmtPong: - begin - // lift pong message to message queue, so user can handle it - SetLength(str, buffLen); - if buffLen > 0 then - Move(buffer, str[1], buffLen); - AddMessageToList(TWebsocketPongMessage.Create(str)); - end - else - Result := False; - end; - end; - -var - Header: TWebsocketFrameHeader; - len64: int64; - len16: word; - len: int64; - MaskRec: TMaskRec; - buffer: TBytes; - i: int64; - Message: TWebsocketMessage; - outputStream: TMemoryStream; - messageType: TWebsocketMessageType; - str: UTF8String; - w: word; -begin - Message := nil; - outputStream := TMemoryStream.Create; - try - try - repeat - if not Open then - Exit; - ReadData(w, 2); - Header := WordToFrameHeader(w); - if Header.OPCode <> wmtContinue then - messageType := TWebsocketMessageType(Header.OPCode); - if Header.PayloadLen < 126 then - len := Header.PayloadLen - else if Header.PayloadLen = 126 then - begin - ReadData(len16, SizeOf(len16)); - len := NToHs(len16); - end - else - begin - ReadData(len64, SizeOf(len64)); - len := ntohll(len64); - end; - if Header.Mask then - begin - ReadData(MaskRec.Key, SizeOf(MaskRec.Key)); - end - else if FAssumeMaskedMessages then - begin - Close(True); - Exit; - end; - // Read payload - SetLength(buffer, len); - if len > 0 then - begin - ReadData(buffer[0], len); - if Header.Mask then - begin - // As this is 64 bit, to be 32 bit compatible we can't use a for loop - i := 0; - while i < len do - begin - buffer[i] := buffer[i] xor MaskRec.Bytes[i mod 4]; - Inc(i); - end; - end; - end; - // Handling special messages - if ProcessSpecialMessages(messageType, PByte(buffer)^, len) then - Continue - else - begin - // This is a dataframe, so save data for concatination of fragments - if len > 0 then - outputStream.WriteBuffer(buffer[0], len); - end; - until Header.Fin; - // Read whole message - outputStream.Seek(0, soBeginning); - case messageType of - wmtString: - begin - SetLength(str, outputStream.Size); - outputStream.ReadBuffer(str[1], outputStream.Size); - Message := TWebsocketStringMessage.Create(str); - end; - wmtBinary: - begin - SetLength(buffer, outputStream.Size); - outputStream.ReadBuffer(buffer[0], outputStream.Size); - Message := TWebsocketBinaryMessage.Create(buffer); - end; - end; - AddMessageToList(Message); - finally - outputStream.Free; - end; - except - On e: EWebsocketReadError do - begin - if e.Code = 0 then - begin - // Stream has been closed - Close(True); - end; - end; - end; -end; - -function TWebsocketCommunincator.WriteMessage(MessageType: TWebsocketMessageType; - MaxFrameLength: int64): TWebsocketMessageStream; -begin - Result := TWebsocketMessageStream.Create(FStream, MessageType, - MaxFrameLength, generateMask); -end; - -function TWebsocketCommunincator.GetUnprocessedMessages( - const MsgList: TWebsocketMessageOwnerList): integer; -var - lst: TWebsocketMessageList; - m: TWebsocketMessage; -begin - lst := FMessages.Lock; - try - Result := lst.Count; - for m in lst do - MsgList.Add(m); - lst.Clear; - finally - FMessages.Unlock; - end; -end; - -{ TWebsocketMessageStream } - -procedure TWebsocketMessageStream.WriteDataFrame(Finished: boolean); -var - Header: TWebsocketFrameHeader; - i: int64; - MaskRec: TMaskRec; - Stream: TSocketStream; -begin - Stream := FDataStream.Lock; - try - if not Assigned(Stream) then - begin - raise EWebsocketWriteError.Create('Stream already closed', 0); - end; - try - Header.Fin := Finished; - Header.Mask := (FMaskKey <> -1); - if FFirstWrite then - Header.OPCode := FMessageType - else - Header.OPCode := wmtContinue; - // Compute size - if FCurrentLen < 126 then - Header.PayloadLen := FCurrentLen - else if FCurrentLen <= word.MaxValue then - Header.PayloadLen := 126 - else - Header.PayloadLen := 127; - // Write header - Stream.WriteWord(FrameHEaderToWord(Header)); - // Write size if it exceeds 125 - if (FCurrentLen > 125) then - begin - if (FCurrentLen <= word.MaxValue) then - Stream.WriteWord(htons(word(FCurrentLen))) - else - Stream.WriteQWord(htonll(QWord(FCurrentLen))); - end; - if Header.Mask then - begin - // If we use a mask - MaskRec.Key := FMaskKey; - // First: Transmit mask Key - Stream.WriteBuffer(MaskRec.Bytes[0], 4); - // 2. Encode Message - // As this is 64 bit, to be 32 bit compatible we can't use a for loop - i := 0; - while i < FCurrentLen do - begin - FBuffer[i] := FBuffer[i] xor MaskRec.Bytes[i mod 4]; - Inc(i); - end; - end; - // Write Message payload - Stream.WriteBuffer(FBuffer[0], FCurrentLen); - // Reset state for next data - FCurrentLen := 0; - except - on E: EWriteError do - raise EWebsocketWriteError.Create(e.Message, Stream.LastError); - end; - finally - FDataStream.Unlock; - end; -end; - -constructor TWebsocketMessageStream.Create(const ADataStream: TLockedSocketStream; - AMessageType: TWebsocketMessageType; AMaxFrameLen: int64; AMaskKey: integer); -begin - FDataStream := ADataStream; - FMaxFrameSize := AMaxFrameLen; - FMessageType := AMessageType; - SetLength(FBuffer, AMaxFrameLen); - FCurrentLen := 0; - FFirstWrite := True; - FMaskKey := AMaskKey; -end; - -destructor TWebsocketMessageStream.Destroy; -begin - WriteDataFrame(True); - inherited Destroy; -end; - -function TWebsocketMessageStream.Seek(Offset: longint; Origin: word): longint; -begin - // We cant seek - Result := 0; -end; - -function TWebsocketMessageStream.Read(var Buffer; Count: longint): longint; -begin - // Write only stream - Result := 0; -end; - -function TWebsocketMessageStream.Write(const Buffer; Count: longint): longint; -var - ToWrite: integer; -begin - while FCurrentLen + Count > FMaxFrameSize do - begin - // Doesn't fit into one dataframe - // So we split it up into multiple - ToWrite := FMaxFrameSize - FCurrentLen; - Move(Buffer, FBuffer[FCurrentLen], ToWrite); - FCurrentLen := FMaxFrameSize; - WriteDataFrame(False); - // Now FCurrentLen should be 0 again - // Only decrese the count - Dec(Count, ToWrite); - end; - Move(Buffer, FBuffer[FCurrentLen], Count); - FCurrentLen += Count; - Result := Count; -end; - -{ TWebsocketMessage } - -constructor TWebsocketMessage.Create(const AMessageType: TWebsocketMessageType); -begin - FMessageType := AMessageType; -end; - -{ TWebsocketStringMessage } - -constructor TWebsocketStringMessage.Create(const AData: UTF8String); -begin - inherited Create(wmtString); - FData := AData; - SetLength(FData, Length(FData)); -end; - -{ TWebsocketPongMessage } - -constructor TWebsocketPongMessage.Create(const AData: UTF8String); -begin - inherited Create(wmtPong); - FData := AData; - SetLength(FData, Length(FData)); -end; - -{ TWebsocketBinaryMessage } - -constructor TWebsocketBinaryMessage.Create(const AData: TBytes); -begin - inherited Create(wmtBinary); - FData := AData; - SetLength(FData, Length(FData)); -end; - -{ THostHandler } - -constructor THostHandler.Create(const AHost: string; FreeObjects: boolean); -begin - FHost := AHost; - inherited Create(FreeObjects); -end; - -{ TWebsocketHandler } - -function TWebsocketHandler.Accept(const ARequest: TRequestData; - const ResponseHeaders: TStrings): boolean; -begin - Result := True; -end; - -procedure TWebsocketHandler.HandleCommunication( - ACommunicator: TWebsocketCommunincator); -begin - // No implementation; To be overriden -end; - -procedure TThreadedWebsocketHandler.HandleCommunication( - ACommunicator: TWebsocketCommunincator); -begin - CreateHandlerThread(ACommunicator, Self); -end; - -procedure TThreadedWebsocketHandler.DoHandleCommunication( - ACommunication: TWebsocketCommunincator); -begin - // No implementation; To be overriden -end; - -{ THostMap } - -constructor THostMap.Create; -begin - inherited Create(True); -end; - -procedure THostMap.AddHost(const AHost: THostHandler); -begin - Objects[AHost.FHost] := AHost; -end; - -{ TLockedHostMap } - -constructor TLockedHostMap.Create; -begin - inherited Create(THostMap.Create); -end; - -procedure TRequestHeaders.Parse(const HeaderString: string); -var - sl: TStringList; - s: string; - p: integer; -begin - sl := TStringList.Create; - try - sl.TextLineBreakStyle := tlbsCRLF; - sl.Text := HeaderString; - for s in sl do - begin - // Use sl.Values instead? - p := s.IndexOf(':'); - if p > 0 then - Self.KeyData[s.Substring(0, p).ToLower] := s.Substring(p + 1).Trim; - end; - finally - sl.Free; - end; -end; - -constructor TRequestHeaders.Create; -begin - inherited Create; - Self.OnKeyCompare := @DoHeaderKeyCompare; - // Binary search => faster access - Self.Sorted := True; -end; - -{ TWebsocketHandshakeHandler } - -function TWebsocketHandshakeHandler.ReadRequest(var RequestData: TRequestData): boolean; -var - method: string; - proto: string; - headerstr: string; - upg: string; - conn: string; - version: string; -begin - Result := False; - // Check if this is HTTP by checking the first line - // Method GET is required - SetLength(method, 4); - FStream.ReadBuffer(method[1], 4); - if method <> 'GET ' then - begin - // Not GET - Exit; - end; - // Read path and HTTP version - FStream.ReadTo(' ', RequestData.Path); - FStream.ReadTo(#13#10, proto, 10); - RequestData.Path := RequestData.Path.TrimRight; - proto := proto.TrimRight.ToLower; - if not proto.StartsWith('http/') then - begin - // Only accept http/1.1 - Exit; - end; - if not proto.EndsWith('1.1') then - begin - // non 1.1 version: return forbidden - Exit; - end; - // Headers are separated by 2 newlines (CR+LF) - FStream.ReadTo(#13#10#13#10, headerstr, 2048); - RequestData.Headers.Parse(headerstr.Trim); - if not (RequestData.Headers.TryGetData('Upgrade', upg) and - RequestData.Headers.TryGetData('Connection', conn) and - RequestData.Headers.TryGetData('Sec-WebSocket-Key', RequestData.Key) and - (upg = 'websocket') and (conn.Contains('Upgrade'))) then - begin - // Seems to be a normal HTTP request, we only handle websockets - Exit; - end; - // How to handle this? - if not RequestData.Headers.TryGetData('Sec-WebSocket-Version', version) then - version := ''; - if not RequestData.Headers.TryGetData('Host', RequestData.Host) then - RequestData.Host := ''; - Result := True; -end; - -function TWebsocketHandshakeHandler.GenerateAcceptingKey(const Key: string): string; -var - concatKey: string; - keyHash: TSHA1Digest; - OutputStream: TStringStream; - b64Encoder: TBase64EncodingStream; -const - WebsocketMagicString = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; -begin - // Key = Base64(SHA1(Key + MagicString)) - concatKey := Key + WebsocketMagicString; - keyHash := SHA1String(concatKey); - OutputStream := TStringStream.Create(''); - try - b64Encoder := TBase64EncodingStream.Create(OutputStream); - try - b64Encoder.WriteBuffer(keyHash[low(keyHash)], Length(keyHash)); - b64Encoder.Flush; - Result := OutputStream.DataString; - finally - b64Encoder.Free; - end; - finally - OutputStream.Free; - end; -end; - -procedure TWebsocketHandshakeHandler.PerformHandshake; -var - RequestData: TRequestData; - hm: THostMap; - hh: THostHandler; - sh: TWebsocketHandler; - ResponseHeaders: TStringList; - i: integer; - HandsakeResponse: TStringList; - Comm: TWebsocketCommunincator; -begin - try - RequestData.Headers := TRequestHeaders.Create; - try - // Reqding request - try - if not ReadRequest(RequestData) then - begin - FStream.WriteRaw(MalformedRequestMessage); - FStream.Free; - Exit; - end; - except - on E: EReadError do - begin - FStream.WriteRaw(MalformedRequestMessage); - FStream.Free; - Exit; - end; - end; - // Getting responsible handler - hm := FHostMap.Lock; - try - hh := hm.Objects[RequestData.Host]; - if not Assigned(hh) then - begin - FStream.WriteRaw(HandlerNotFoundMessage); - FStream.Free; - Exit; - end; - sh := hh.Objects[RequestData.Path]; - if not Assigned(sh) then - begin - FStream.WriteRaw(HandlerNotFoundMessage); - FStream.Free; - Exit; - end; - finally - FHostMap.Unlock; - end; - // Checking if handler wants to accept - ResponseHeaders := TStringList.Create; - try - ResponseHeaders.NameValueSeparator := ':'; - if not sh.Accept(RequestData, ResponseHeaders) then - begin - FStream.WriteRaw(ForbiddenRequestMessage); - FStream.Free; - Exit; - end; - // Neseccary headers - ResponseHeaders.Values['Connection'] := 'Upgrade'; - ResponseHeaders.Values['Upgrade'] := 'websocket'; - ResponseHeaders.Values['Sec-WebSocket-Accept'] := - GenerateAcceptingKey(RequestData.Key); - // Generating response - HandsakeResponse := TStringList.Create; - try - HandsakeResponse.TextLineBreakStyle := tlbsCRLF; - HandsakeResponse.Add('HTTP/1.1 101 Switching Protocols'); - for i := 0 to ResponseHeaders.Count - 1 do - HandsakeResponse.Add('%s: %s'.Format([ResponseHeaders.Names[i], - ResponseHeaders.ValueFromIndex[i]])); - HandsakeResponse.Add(''); - - FStream.WriteRaw(HandsakeResponse.Text); - finally - HandsakeResponse.Free; - end; - finally - ResponseHeaders.Free; - end; - finally - RequestData.Headers.Free; - end; - Comm := TWebsocketCommunincator.Create(TLockedSocketStream.Create(FStream), - False, True); - finally - // Not needed anymore, we can now die in piece. - // All information requier for the rest is now on the stack - Self.Free; - end; - sh.HandleCommunication(Comm); -end; - -constructor TWebsocketHandshakeHandler.Create(AStream: TSocketStream; - AHostMap: TLockedHostMap); -begin - FHostMap := AHostMap; - FStream := AStream; -end; - -{ TWebSocketServer } - -procedure TWebSocketServer.DoCreate; -begin - FSocket.OnConnect := @HandleConnect; - FHostMap := TLockedHostMap.Create; - FFreeHandlers := True; - FAcceptingMethod := samDefault; -end; - -procedure TWebSocketServer.HandleConnect(Sender: TObject; Data: TSocketStream); -var - HandshakeHandler: TWebsocketHandshakeHandler; - t: TAcceptingThread; -begin - HandshakeHandler := TWebsocketHandshakeHandler.Create(Data, FHostMap); - case AcceptingMethod of - samDefault: - HandshakeHandler.PerformHandshake; - samThreaded: - begin - t := TAcceptingThread.Create(True); - t.DoTerminate := True; - t.FreeOnTerminate := True; - t.HandshakeHandler := HandshakeHandler; - t.Restart; - end; - samThreadPool: - CreateAcceptingThread(HandshakeHandler); - end; -end; - -procedure TWebSocketServer.Start; -begin - FSocket.StartAccepting; -end; - -procedure TWebSocketServer.Stop(DoAbort: boolean); -begin - FSocket.StopAccepting(DoAbort); -end; - -procedure TWebSocketServer.RegisterHandler(const AHost: string; - const APath: string; AHandler: TWebsocketHandler; DefaultHost: boolean; - DefaultPath: boolean); -var - map: THostMap; - hh: THostHandler; -begin - map := FHostMap.Lock; - try - if not map.TryGetObject(AHost, hh) then - begin - hh := THostHandler.Create(AHost, FFreeHandlers); - map.AddHost(hh); - end; - if DefaultHost then - map.DefaultObject := hh; - hh[APath] := AHandler; - if DefaultPath then - hh.DefaultObject := AHandler; - finally - FHostMap.Unlock; - end; -end; - -destructor TWebSocketServer.Destroy; -begin - Stop(True); - FSocket.Free; - FHostMap.Free; - inherited Destroy; -end; - -constructor TWebSocketServer.Create(const AHost: string; const APort: word; - AHandler: TSocketHandler); -begin - FSocket := TInetServer.Create(AHost, APort, AHandler); - DoCreate; -end; - -constructor TWebSocketServer.Create(const APort: word); -begin - FSocket := TInetServer.Create(APort); - DoCreate; -end; - -initialization - AcceptingThreadPool := TLockedAcceptingThreadPool.Create(TAcceptingThreadPool.Create); - HandlerThreadPool := TLockedHandlerThreadPool.Create(THandlerThreadPool.Create); - RecieverThreadPool := TLockedRecieverThreadPool.Create(TRecieverThreadPool.Create); - -finalization - AcceptingThreadPool.Free; - RecieverThreadPool.Free; - HandlerThreadPool.Free; - -end. diff --git a/websockets.lpk b/websockets.lpk new file mode 100644 index 0000000..14f3d49 --- /dev/null +++ b/websockets.lpk @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<CONFIG> + <Package Version="5"> + <Name Value="Websockets"/> + <Type Value="RunAndDesignTime"/> + <CompilerOptions> + <Version Value="11"/> + <SearchPaths> + <OtherUnitFiles Value="src"/> + <UnitOutputDirectory Value="lib/$(TargetCPU)-$(TargetOS)/"/> + </SearchPaths> + <CodeGeneration> + <SmartLinkUnit Value="True"/> + <Optimizations> + <OptimizationLevel Value="3"/> + </Optimizations> + </CodeGeneration> + <Linking> + <Debugging> + <GenerateDebugInfo Value="False"/> + </Debugging> + </Linking> + </CompilerOptions> + <Files> + <Item> + <Filename Value="src/websocketserver.pas"/> + <UnitName Value="WebSocketServer"/> + </Item> + <Item> + <Filename Value="src/wsutils.pas"/> + <UnitName Value="wsutils"/> + </Item> + <Item> + <Filename Value="src/wsmessages.pas"/> + <UnitName Value="wsmessages"/> + </Item> + <Item> + <Filename Value="src/wsstream.pas"/> + <UnitName Value="wsstream"/> + </Item> + </Files> + <RequiredPkgs> + <Item> + <PackageName Value="FCL"/> + </Item> + </RequiredPkgs> + <UsageOptions> + <UnitPath Value="$(PkgOutDir)"/> + </UsageOptions> + <PublishOptions> + <Version Value="2"/> + <UseFileFilters Value="True"/> + </PublishOptions> + </Package> +</CONFIG> diff --git a/websockets.pas b/websockets.pas new file mode 100644 index 0000000..3973760 --- /dev/null +++ b/websockets.pas @@ -0,0 +1,21 @@ +{ This file was automatically created by Lazarus. Do not edit! + This source is only used to compile and install the package. + } + +unit Websockets; + +{$warn 5023 off : no warning about unused units} +interface + +uses + WebSocketServer, wsutils, wsmessages, wsstream, LazarusPackageIntf; + +implementation + +procedure Register; +begin +end; + +initialization + RegisterPackage('Websockets', @Register); +end.