客户端为C++语言,服务端为Pascal语言——网络模型之消息异步;(帖源码)
1,消息异步的话,顾名思义,是基于消息传递信号,而响应相相对应事件的网络模型。因为我只需要客户端仅仅听从于服务端一个人的命令,所以C++的模型为阻塞模式;
2,两端采用类封装,简化代码流程;
3,代码封装成流,以便于接收命令码、错误码和数据;以保证数据准确;
客户端(C++):
//头文件:CSocket.h
//需要包含Winsock库
//#include <WinSock2.h>
//#pragma comment(lib,“ws2_32.lib”)
//利用类简化流程,封装成流,以便于接收命令码和错误码还有数据
class CSocket { public: CSocket(); CSocket(SOCKET s); ~CSocket(void); public: int Create(); int Close(); int Connect(char* host, WORD port); int SoSendStream(DWORD ucmd, DWORD uerr, CStream* buf); int SoRecvStream(DWORD& ucmd, DWORD& uerr, CStream* buf); private: int SoSendBytes( char* buf, int len); int SoRecvBytes( char* buf, int len); int SoRecv(char* buf, int len, int& cb); int SoSend(char* buf, int len, int& cb); private: SOCKET m_s; };
CPP源码:
#include "StdAfx.h" #include "CSocket.h" CSocket::CSocket() { m_s = INVALID_SOCKET; } CSocket::~CSocket() { if (int e = Close()) { printf("%d\r\n",GetLastError()); } } int CSocket::Create() { m_s = socket(AF_INET, SOCK_STREAM, 0); if (INVALID_SOCKET == m_s) { return WSAGetLastError(); } return 0; } int CSocket::Close() { if (INVALID_SOCKET != m_s) { if (closesocket(m_s)) { printf("%d\r\n",GetLastError()); return WSAGetLastError(); } m_s = INVALID_SOCKET; } return 0; } int CSocket::Connect(char* host, WORD port) { SOCKADDR_IN addrSrv; addrSrv.sin_addr.S_un.S_addr = inet_addr(host); addrSrv.sin_family = AF_INET; addrSrv.sin_port = htons(port); if (connect(m_s, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR))) { return WSAGetLastError(); } DWORD timeout = 300 * 1000; if (setsockopt(m_s,SOL_SOCKET,SO_RCVTIMEO,(char*)&timeout,sizeof(timeout))) { printf("%d\r\n",GetLastError()); } if (setsockopt(m_s,SOL_SOCKET,SO_SNDTIMEO,(char*)&timeout,sizeof(timeout))) { printf("%d\r\n",GetLastError()); } return 0; } int CSocket::SoRecv(char* buf, int len, int& cb) { cb = recv(m_s, buf, len, 0); if (cb > 0) { return 0; } if (cb < 0) { printf("%d\r\n",GetLastError());//WSAGetLastError(); } return WSAECONNRESET; } int CSocket::SoSend(char* buf, int len, int& cb) { cb = send(m_s, buf, len, 0); if (cb > 0) { return 0; } if (cb < 0) { printf("%d\r\n",GetLastError()); return WSAGetLastError(); } return WSAECONNRESET; } int CSocket::SoSendBytes(char * buf, int len) { int off = 0; while (off < len) { int cb = 0; if (int e = SoSend((char*)buf + off, len - off, cb)) { return e; } off += cb; } return 0; } int CSocket::SoRecvBytes(char * buf, int len) { int off = 0; while (off < len) { int cb = 0; if (int e = SoRecv((char*)buf + off, len - off, cb)) { return e; } off += cb; } return 0; } int CSocket::SoSendStream(DWORD ucmd, DWORD uerr, CStream * buf) { if (int e = SoSendBytes((char*)&buf->m_size, sizeof(buf->m_size))) { return e; } if (int e = SoSendBytes((char*)&ucmd, sizeof(ucmd))) { return e; } if (int e = SoSendBytes((char*)&uerr, sizeof(uerr))) { return e; } if (int e = SoSendBytes((char*)buf->m_p, buf->m_size)) { return e; } return 0; } int CSocket::SoRecvStream(DWORD& ucmd, DWORD& uerr, CStream* buf) { int size; if (int e = SoRecvBytes((char*)&size, sizeof(size))) { return e; } DBG_ASSERT((size >= 0) && (size < 1024 * 1024), 0); DBG_ASSERT((size >= 0) && (size < 1024 * 1024), 0); DBG_ASSERT((size >= 0) && (size < 1024 * 1024), 0); if (int e = SoRecvBytes((char*)&ucmd, sizeof(ucmd))) { return e; } if (int e = SoRecvBytes((char*)&uerr, sizeof(uerr))) { return e; } if (int e = buf->SetSize(size)) { printf("%d\r\n",GetLastError()); // return e; } if (int e = SoRecvBytes((char*)buf->m_p, buf->m_size)) { return e; } return 0; }
服务端(Pascal):
unit UnitXSocket; interface uses Winapi.Windows, Winapi.Messages, WinSock, System.Types, System.SysUtils, System.Variants, System.SyncObjs, System.Classes, Vcl.Graphics, Vcl.Controls, Vcl.Forms, Vcl.Dialogs, UnitXStream; const WM_SOCKET = (WM_USER + 1); WM_SOCKET_ACCEPT = (WM_USER + 2); WM_SOCKET_RECV = (WM_USER + 3); WM_SOCKET_CLOSE = (WM_USER + 4); type TXSock = class public constructor Create; destructor Destroy; override; public procedure OnAccept(err: integer); virtual; procedure OnRecv(err: integer); virtual; procedure OnSend(err: integer); virtual; procedure OnClose(err: integer); virtual; public procedure Close(); public procedure SelectEvent(); public Fs: TSocket; public FListR: TList; FListS: TList; FLock: TCriticalSection; private FEnableNotify: BOOLEAN; protected FRef: integer; end; TXSockWnd = class public constructor Create; destructor Destroy; override; public function Run(): integer; procedure SetNotifyWnd(wnd: Hwnd); private class function WndProc(wnd: Hwnd; uMsg: UINT; wPrm: wParam; lPrm: LPARAM) : LRESULT; stdcall; static; private procedure OnSocketEvent(s: TSocket; fd: dword; err: dword); public Fwnd: Hwnd; FwndNotify: Hwnd; FSockList: TList; end; TXSocketSrvr = class(TXSock)//基类 public function SrvrCreate(ip: string; port: WORD): integer; function SrvrClose(): integer; public procedure OnAccept(err: integer); override; end; TXSocket = class(TXSock)//继承基类 public constructor Create; destructor Destroy; override; public procedure Ref(); procedure Unref(); procedure SetPrbWnd(wnd: Hwnd; msg: UINT); public function SoSendStream(ucmd: integer; uerr: integer; buf: TXStream): integer; function SoRecvStream(var ucmd: integer; var uerr: integer; buf: TXStream): integer; function SoPeekStream(var ucmd: integer; var uerr: integer; buf: TXStream): integer; private function SoSend(buf: Pbyte; len: integer; var cb: integer): integer; function SoSendBytes(buf: Pbyte; len: integer): integer; function SoRecv(buf: Pbyte; len: integer; var cb: integer): integer; function SoRecvBytes(buf: Pbyte; len: integer): integer; public procedure OnRecv(err: integer); override; procedure OnSend(err: integer); override; procedure OnClose(err: integer); override; public Ffrm: Pointer; FLastRecvTime: dword; Fmem: TXStream; Fwnd: Hwnd; Fmsg: UINT; end; var SockWnd: TXSockWnd; implementation constructor TXSock.Create; begin inherited; FListR := TList.Create; FListS := TList.Create; FLock := TCriticalSection.Create; FEnableNotify := TRUE; FRef := 0; end; destructor TXSock.Destroy; begin FListR.Free; FListS.Free; FLock.Free; inherited; end; procedure TXSock.OnAccept(err: integer); begin end; procedure TXSock.OnRecv(err: integer); begin end; procedure TXSock.OnSend(err: integer); begin end; procedure TXSock.OnClose(err: integer); begin end; procedure TXSock.SelectEvent(); var fd: dword; begin fd := FD_CLOSE; if FListS.Count > 0 then fd := fd or FD_WRITE; if FListR.Count < 4 then fd := fd or FD_READ; if Fs = INVALID_SOCKET then fd := 0; if 0 <> WSAAsyncSelect(Fs, SockWnd.Fwnd, WM_SOCKET, fd) then begin end; end; procedure TXSock.Close(); var i: integer; begin if Fs = INVALID_SOCKET then Assert(False); if 0 <> WSAAsyncSelect(Fs, SockWnd.Fwnd, WM_SOCKET, 0) then begin Assert(False); end; SockWnd.FSockList.Remove(self); if 0 <> closesocket(Fs) then Assert(False); Fs := INVALID_SOCKET; for i := 0 to FListR.Count - 1 do begin TMemoryStream(FListR.Items[i]).Free; end; FListR.Clear; for i := 0 to FListS.Count - 1 do begin TMemoryStream(FListS.Items[i]).Free; end; FListS.Clear; if FRef >= 0 then // IS SERVER begin SendMessage(SockWnd.FwndNotify, WM_SOCKET_CLOSE, wParam(0), LPARAM(self)); end; exit; end; constructor TXSockWnd.Create; var wsad: WSADATA; begin Fwnd := 0; FSockList := TList.Create; if 0 <> WSAStartup(MAkEWORD(1, 1), wsad) then Assert(False); end; destructor TXSockWnd.Destroy; begin if 0 <> WSACleanup() then Assert(False); FSockList.Free; end; procedure TXSockWnd.SetNotifyWnd(wnd: Hwnd); begin FwndNotify := wnd; end; function TXSockWnd.Run(): integer; var wc: WNDCLASSA; msg: TMsg; begin ZeroMemory(@wc, sizeof(wc)); wc.lpfnWndProc := @WndProc; wc.lpszClassName := 'XSW'; if 0 = RegisterClassA(wc) then begin RTL_GetErrMsg(GetLastError()); Assert(False); end; Fwnd := CreateWindow('XSW', 'XSW', 0, 0, 0, 0, 0, 0, 0, 0, nil); if Fwnd = 0 then Assert(False); while GetMessage(msg, 0, 0, 0) do begin TranslateMessage(msg); DispatchMessage(msg); end; result := msg.wParam; exit; end; class function TXSockWnd.WndProc(wnd: Hwnd; uMsg: UINT; wPrm: wParam; lPrm: LPARAM): LRESULT; begin case uMsg of WM_DESTROY: PostQuitMessage(0); WM_SOCKET: SockWnd.OnSocketEvent(TSocket(wPrm), WSAGETSELECTEVENT(lPrm), WSAGETSELECTERROR(lPrm)); end; result := DefWindowProc(wnd, uMsg, wPrm, lPrm); exit; end; procedure TXSockWnd.OnSocketEvent(s: TSocket; fd: dword; err: dword); var i: integer; begin for i := 0 to FSockList.Count - 1 do begin if TXSock(FSockList.Items[i]).Fs = s then begin case fd of FD_ACCEPT: TXSock(FSockList.Items[i]).OnAccept(err); FD_READ: TXSock(FSockList.Items[i]).OnRecv(err); FD_WRITE: TXSock(FSockList.Items[i]).OnSend(err); FD_CLOSE: TXSock(FSockList.Items[i]).OnClose(err); else Assert(False); end; exit; end; end; end; function TXSocketSrvr.SrvrCreate(ip: string; port: WORD): integer; var sai: TSockAddrIn; begin FRef := -1; Fs := socket(AF_INET, SOCK_STREAM, 0); if Fs = INVALID_SOCKET then Assert(False); sai.sin_family := AF_INET; sai.sin_addr.S_addr := inet_addr(PAnsiChar(AnsiString(ip))); sai.sin_port := htons(port); if 0 <> bind(Fs, sai, sizeof(sai)) then begin result := WSAGetLastError(); if 0 <> closesocket(Fs) then Assert(False); Fs := INVALID_SOCKET; exit; end; if 0 <> listen(Fs, 5) then Assert(False); SockWnd.FSockList.Add(self); if 0 <> WSAAsyncSelect(Fs, SockWnd.Fwnd, WM_SOCKET, FD_ACCEPT) then begin Assert(False); end; result := 0; exit; end; function TXSocketSrvr.SrvrClose(): integer; begin Close(); result := 0; exit; end; procedure TXSocketSrvr.OnAccept(err: integer); var sa: TSocket; sai_peer: TSockAddrIn; sai_peer_len: integer; so: TXSocket; begin if 0 <> err then begin exit; end; sai_peer_len := sizeof(sai_peer); sa := accept(Fs, @sai_peer, @sai_peer_len); if sa = INVALID_SOCKET then begin if WSAEWOULDBLOCK <> WSAGetLastError() then Assert(False); exit; end; so := TXSocket.Create; so.Fs := sa; SockWnd.FSockList.Add(so); { if 0 <> WSAAsyncSelect(sa, SockWnd.Fwnd, WM_SOCKET, FD_READ or FD_CLOSE) then begin Assert(False); end; } if SockWnd.FwndNotify = 0 then Assert(False); SendMessage(SockWnd.FwndNotify, WM_SOCKET_ACCEPT, wParam(0), LPARAM(so)); end; procedure TXSocket.OnRecv(err: integer); var cb, size: integer; begin if 0 <> err then begin Close(); exit; end; if Fmem = nil then begin Fmem := TXStream.Create; Fmem.SetSize(12); end; if Fmem.Position < 12 then begin cb := 12 - Fmem.Position; if Fwnd <> 0 then SendMessage(Fwnd, Fmsg, Fmem.Position, 12); end else begin size := LPDWORD(Fmem.Memory)^; cb := 12 + size; Fmem.SetSize(cb); cb := cb - Fmem.Position; end; if cb <= 0 then Assert(False); cb := recv(Fs, (Pbyte(Fmem.Memory) + Fmem.Position)^, cb, 0); if cb = 0 then begin Close(); end else if cb < 0 then begin if WSAEWOULDBLOCK <> WSAGetLastError() then begin Close(); end; end else begin Fmem.Position := Fmem.Position + cb; size := LPDWORD(Fmem.Memory)^; if Fwnd <> 0 then SendMessage(Fwnd, Fmsg, Fmem.Position, 12 + size); if Fmem.Position = 12 + size then begin Fmem := nil; end; end; SelectEvent(); if FEnableNotify then begin SendMessage(SockWnd.FwndNotify, WM_SOCKET_RECV, wParam(0), LPARAM(self)); end; exit; end; procedure TXSocket.OnSend(err: integer); var mem: TMemoryStream; cb: integer; begin if 0 <> err then begin Close(); exit; end; if FListS.Count = 0 then begin SelectEvent(); exit; end; mem := TMemoryStream(FListS.Items[0]); Assert(mem.Position < mem.size); Assert(mem.Position < mem.size); Assert(mem.Position < mem.size); if (Fwnd <> 0) and (mem.Position = 0) then SendMessage(Fwnd, Fmsg, 0, mem.size); cb := send(Fs, (Pbyte(mem.Memory) + mem.Position)^, mem.size - mem.Position, 0); if cb = 0 then begin Close(); end else if cb < 0 then begin if WSAEWOULDBLOCK <> WSAGetLastError() then begin Close(); end; end else begin if Fwnd <> 0 then SendMessage(Fwnd, Fmsg, mem.Position + cb, mem.size); if mem.Position + cb = mem.size then begin FLock.Enter; FListS.Remove(mem); FLock.Leave; mem.Free; end else begin mem.Seek(mem.Position + cb, soBeginning); end; end; SelectEvent(); end; procedure TXSocket.OnClose(err: integer); begin if err <> 0 then begin end; Close(); end; function TXSocket.SoPeekStream(var ucmd: integer; var uerr: integer; buf: TXStream): integer; begin result := WSAEWOULDBLOCK; if not FEnableNotify then exit; if FListR.Count = 0 then exit; result := SoRecvStream(ucmd, uerr, buf); FEnableNotify := False; exit; end; function TXSocket.SoRecv(buf: Pbyte; len: integer; var cb: integer): integer; var mem: TMemoryStream; begin while TRUE do begin if Fs = INVALID_SOCKET then begin result := WSAECONNRESET; exit; end; if FListR.Count <> 0 then break; Sleep(1); end; mem := TMemoryStream(FListR.Items[0]); cb := mem.size - mem.Position; if cb > len then cb := len; CopyMemory(buf, Pbyte(mem.Memory) + mem.Position, cb); if mem.Position + cb = mem.size then begin FLock.Enter; FListR.Remove(mem); FLock.Leave; mem.Free; end else begin mem.Seek(mem.Position + cb, soBeginning); end; SelectEvent(); result := 0; exit; end; function TXSocket.SoRecvBytes(buf: Pbyte; len: integer): integer; var cb, off: integer; begin off := 0; while off < len do begin result := SoRecv(buf + off, len - off, cb); if result <> 0 then exit; off := off + cb; end; result := 0; end; function TXSocket.SoRecvStream(var ucmd: integer; var uerr: integer; buf: TXStream): integer; var mem: TXStream; ulen: dword; begin while FListR.Count = 0 do begin if Fs = INVALID_SOCKET then begin result := WSAECONNRESET; exit; end; Sleep(1); end; FLock.Enter; mem := TXStream(FListR.Items[0]); mem.Seek(0, soBeginning); mem.ReadData(ulen); mem.ReadData(ucmd); mem.ReadData(uerr); buf.SetSize(ulen); CopyMemory(buf.Memory, Pbyte(mem.Memory) + 12, ulen); buf.Seek(0, soBeginning); FListR.Remove(mem); FLock.Leave; mem.Free; SelectEvent(); result := 0; exit; { result := SoRecvBytes(@size, sizeof(size)); if result <> 0 then exit; if (size < 0) or (size >= 1024 * 1024) then begin Sleep(0); end; Assert((size >= 0) and (size <= 1024 * 1024 * 10)); Assert((size >= 0) and (size <= 1024 * 1024 * 10)); Assert((size >= 0) and (size <= 1024 * 1024 * 10)); result := SoRecvBytes(@ucmd, sizeof(ucmd)); if result <> 0 then exit; result := SoRecvBytes(@uerr, sizeof(uerr)); if result <> 0 then exit; buf.SetSize(size); result := SoRecvBytes(Pbyte(buf.Memory), size); if result <> 0 then exit; buf.Seek(0, soBeginning); } end; constructor TXSocket.Create; begin inherited; Ffrm := nil; FKeepalive := 10 * 1000; FKeepaliveTimeout := 15 * 1000; FLastRecvTime := GetTickCount(); Fmem := nil; Fwnd := 0; Fmsg := 0; end; destructor TXSocket.Destroy; begin inherited; end; procedure TXSocket.Ref(); begin if FRef < 0 then Assert(False); FRef := FRef + 1; end; procedure TXSocket.Unref(); begin if FRef < 1 then Assert(False); FRef := FRef - 1; if FRef = 0 then begin self.Free; end; end; procedure TXSocket.SetPrbWnd(wnd: Hwnd; msg: UINT); begin Fwnd := wnd; Fmsg := msg; end; function TXSocket.SoSend(buf: Pbyte; len: integer; var cb: integer): integer; var mem: TMemoryStream; begin while TRUE do begin if Fs = INVALID_SOCKET then begin result := WSAECONNRESET; exit; end; if FListR.Count < 4 then break; Sleep(1); end; cb := 4096; if cb > len then cb := len; mem := TMemoryStream.Create; mem.SetSize(cb); CopyMemory(mem.Memory, buf, cb); FLock.Enter; FListS.Add(mem); FLock.Leave; SelectEvent(); result := 0; exit; end; function TXSocket.SoSendBytes(buf: Pbyte; len: integer): integer; var cb, off: integer; begin off := 0; while off < len do begin result := SoSend(buf + off, len - off, cb); if result <> 0 then exit; off := off + cb; end; result := 0; end; function TXSocket.SoSendStream(ucmd: integer; uerr: integer; buf: TXStream): integer; var mem: TXStream; begin mem := TXStream.Create; mem.WriteData(dword(buf.size)); mem.WriteData(ucmd); mem.WriteData(uerr); mem.Write(buf.Memory^, buf.size); mem.Seek(0, soBeginning); while FListS.Count > 4 do begin if Fs = INVALID_SOCKET then begin result := WSAECONNRESET; exit; end; Sleep(1); end; FLock.Enter; FListS.Add(mem); FLock.Leave; SelectEvent(); result := 0; exit; end; end.
来源:freebuf.com 2021-06-21 20:48:11 by: PEHacker
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
喜欢就支持一下吧
请登录后发表评论
注册