지금까지 한 작업은 Listener에서 접속을 시도한 클라이언트를 접속시켜주는 기능만 있었다.
이번에는 세션을 이용해서 클라이언트의 데이터를 Recv하는 기능을 만들어 줄 것이다.
Session.h
#pragma once
#include "IocpCore.h"
#include "IocpEvent.h"
#include "NetAddress.h"
class Service;
/*======================
*
* Session
*
======================*/
//클라이언트가 접속을 했을 때
//클라와 관련된 모든 정보를 Session 클래스로 관리함.
class Session : public IocpObject
{
friend class Listener;
friend class IocpCore;
friend class Service;
public:
Session();
virtual ~Session();
public:
void Disconnect(const WCHAR* cause);
shared_ptr<Service> GetService() { return _service.lock(); }
void SetService(shared_ptr<Service> service) { _service = service; }
public:
/* 정보 관련 */
void SetNetAddress(NetAddress address) { _netAddress = address; }
NetAddress GetAddress() { return _netAddress; }
SOCKET GetSocket() { return _socket; }
bool IsConnected() { return _connected; }
SessionRef GetSessionRef() { return static_pointer_cast<Session>(shared_from_this()); }
private:
/* 인터페이스 구현 */
virtual HANDLE GetHandle() override;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfBytes = 0) override;
private:
/* 전송 관련 */
void RegisterConnect();
void RegisterRecv();
void RegisterSend();
void ProcessConnect();
void ProcessRecv(int32 numOfBytes);
void ProcessSend(int32 numOfBytes);
void HandleError(int32 errorCode);
protected:
/* 컨텐츠 코드에서 오버로딩 */
virtual void OnConnected() {}
virtual int32 OnRecv(BYTE* buffer, int32 len) { return len; }
virtual void OnSend(int32 len) {}
virtual void OnDisConnected() {}
public:
//TEMP
char _recvBuffer[1000];
private:
weak_ptr<Service> _service;
SOCKET _socket = INVALID_SOCKET;
NetAddress _netAddress = {};
Atomic<bool> _connected = false;
private:
USE_LOCK;
/* 수신 관련 */
/* 송신 관련 */
private:
/* IocpEvent 재사용 */
RecvEvent _recvEvent;
};
우리가 Session을 상속받아서 아래처럼 컨텐츠 코드에서 사용하는데,
그렇게 되면 Session 클래스에서 네트워크와 관련된 내부적으로만 사용되는 애들을 숨겨주는게 좋을 것 같다.
하지만 그렇다고 private으로 막아 놓으면 내부 코드에서도 사용할 수 없으니 friend 클래스로 선언해준다.
그리고 Session에 전송 관련 함수를 만들어 준다.
(사용 예로는 세션이 성공적으로 접속 되었을 때 Listener에서 세션쪽 함수인 OnConnected를 호출해서 접속했다고 알려줌.)
void RegisterConnect();
void RegisterRecv();
void RegisterSend();
void ProcessConnect();
void ProcessRecv(int32 numOfBytes);
void ProcessSend(int32 numOfBytes);
void HandleError(int32 errorCode);
지금은 RegisterConnect()를 하지 않고, Connection이 일어나면 Listener에서 바로 ProcessConnect를 호출하고 있다.
경우에 따라서 서비스 중에서 클라이언트 서비스로 동작을 하게 되면 직접적으로 RegisterConnect()를 호출해서
우리가 만든 ConnectEx 함수를 호출하여 결과를 IOCP를 통해 통보받게 된다.
컨텐츠 코드에서 오버로딩하여 사용할 함수도 만들어 준다.
virtual void OnConnected() {} //연결 되었을 때
virtual int32 OnRecv(BYTE* buffer, int32 len) { return len; } //패킷을 받았을 때
virtual void OnSend(int32 len) {} //데이터를 성공적으로 보냈을 때
virtual void OnDisConnected() {} //연결이 끊어졌을 때
마지막으로 내부에서도 Service에 대한 존재를 알아야 Service 자신을 등록하거나 해제하는 행동을 할 수 있기 때문에
weak_ptr<Service> _service;
를 추가해준다.
shared_ptr이 아닌 이유는 순환을 최대한 줄이기 위해서이다. (Service 내에서 Session을 물고 있음)
서버가 크래시 나거나 종료되지 않는 이상 Service는 항상 어딘가에 존재하기 때문에 weak_ptr을 이용해 바로 사용하도록 하였다.
Session.cpp
#include "pch.h"
#include "Session.h"
#include "SocketUtils.h"
#include "Service.h"
Session::Session()
{
_socket = SocketUtils::CreateSocket();
}
Session::~Session()
{
SocketUtils::CloseSocket(_socket);
}
void Session::Disconnect(const WCHAR* cause)
{
//T exchange(T t, U u) : t에 u를 할당, 원래 t를 반환
//fetch_add 느낌
if (_connected.exchange(false) == false)
return;
//TEMP
wcout << "Disconnect : " << cause << endl;
OnDisConnected(); //컨텐츠 코드에서 오버로딩
SocketUtils::CloseSocket(_socket);
GetService()->ReleaseSession(GetSessionRef());
}
HANDLE Session::GetHandle()
{
return reinterpret_cast<HANDLE>(_socket);
}
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfBytes)
{
switch (iocpEvent->eventType)
{
case EventType::Connect:
ProcessConnect();
break;
case EventType::Recv:
ProcessRecv(numOfBytes);
break;
case EventType::Send:
ProcessSend(numOfBytes);
break;
default:
break;
}
}
void Session::RegisterConnect()
{
}
void Session::RegisterRecv()
{
if (IsConnected() == false)
return;
_recvEvent.Init();
_recvEvent.owner = shared_from_this(); //레퍼런스 카운트 1 증가
WSABUF wsaBuf;
wsaBuf.buf = reinterpret_cast<char*>(_recvBuffer);
wsaBuf.len = len32(_recvBuffer);
DWORD numOfBytes = 0;
DWORD flags = 0;
if (::WSARecv(_socket, &wsaBuf, 1, OUT &numOfBytes, OUT &flags, &_recvEvent, nullptr) == SOCKET_ERROR)
{
int32 errCode = ::WSAGetLastError();
if (errCode != WSA_IO_PENDING)
{
HandleError(errCode);
_recvEvent.owner = nullptr;
}
}
}
void Session::RegisterSend()
{
}
void Session::ProcessConnect()
{
_connected.store(true);
//세션 등록
GetService()->AddSession(GetSessionRef());
//컨텐츠 코드에서 오버로딩
OnConnected();
//수신 등록
RegisterRecv();
}
void Session::ProcessRecv(int32 numOfBytes)
{
//성공적으로 비동기 WSARecv()가 완료된 상태이니까 레퍼런스 카운트를 1감소 시켜준다.
_recvEvent.owner = nullptr;
if (numOfBytes == 0)
{
Disconnect(L"Recv 0");
return;
}
//TODO
cout << "Recv Data Len = " << numOfBytes << endl;
RegisterRecv();
}
void Session::ProcessSend(int32 numOfBytes)
{
}
void Session::HandleError(int32 errorCode)
{
switch (errorCode)
{
case WSAECONNRESET:
case WSAECONNABORTED:
Disconnect(L"HandleError");
break;
default:
//TODO : Log
cout << "Handle Error : " << errorCode << endl;
break;
}
}
void Session::ProcessConnect()
Atomic으로 만든 접속 여부를 알려주는 _connected에 true 값을 넣어서 접속을 했단 것을 알려준다.
그리고 Service에 Session을 등록해준다.
접속했을 때 사용할 OnConnected() 함수를 호출해준다.
마지막으로 RegisterRecv()를 호출해주어 수신 등록을 한다.
void Session::RegisterRecv()
_recvEvent.owner를 자신의 shared_ptr로 설정해 주면서 레퍼런스 카운트를 1증가 시켜준다.
실질적으로 수신이 일어나는 곳.
WSARecv()를 호출한다.
만약 수신에 실패했을 경우 _recvEvent의 owner를 nullptr로 밀어서 레퍼런스 카운트를 1 감소 시켜줘야 한다.
만약 errCode가 PENDING이 아니라고 하면 IOCP에 성공적으로 완료 통지가 뜨지 않기 때문에
아래와 같이 처리하지 않으면 레퍼런스 카운팅이 줄어들지 않으면서 Session은 영영 삭제가 되지 않아
Memory Leak 현상이 일어날 것이다.
추가로 Recv는 한번에 한개의 쓰레드만 RegisterRecv를 호출하게 될 것이므로,
멀티쓰레드 환경을 고려하지 않아도 괜찮다.
왜냐하면 RegisterRecv를 리스너를 통해 연결되었을 때 한 번, 그 이후로는 데이터를 recv한 쓰레드가 다시 RegisterRecv를 호출해서 WSARecv를 호출해놓는다.
즉, 낚싯대가 하나밖에 없으므로 여러개의 쓰레드가 동시에 WSARecv를 호출할 일이 없다.
void Session::ProcessRecv(int32 numOfBytes)
ProcessRecv가 호출이 되었단 것은 성공적으로 비동기 WSARecv()가 완료된 상태이므로 _recvEvent.owner의
레퍼런스 카운트를 다시 1감소 시켜준다.
(그리고 이런식으로 _recvEvent를 계속 재사용 한다면 Owner를 날리지 않아도 되지만, 확실하게 Owner의 생명주기를 관리해 주기 위해서 날려준다.)
만약 받은 데이터의 길이가 0일경우 연결이 끊어진 것이므로, Disconnect()를 호출하고 return 해준다.
수신 완료후 다시 RegisterRecv()를 호출해서 수신 등록 해줌으로써 반복적으로 데이터를 수신할 수 있게 해준다.
void Session::HandleError(int32 errorCode)
간단하게 errorCode의 종류에 맞는 처리를 해준다.
지금은 간단하게 로그를 작성하도록 하였지만, 나중에는 세세하게 작성하도록 수정할 것이다.
콘솔로 출력 하는 것도(cout) 쓰레드 작업으로 부터 안전하긴 하지만 어느정도 컨텍스트 스위칭 비용이 발생하므로 일감형태로 만들어서 로그만 찍는 스레드에게 떠 넘기는 경우가 있다.
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfBytes)
IocpEvent의 이벤트 타입에 따른 함수를 호출한다.
void Session::Disconnect(const WCHAR* cause)
특별할 것 없이 연결을 끊어주는 함수이다.
OnDisConnected()를 호출해준다.
이제 이렇게 클라이언트에서 온 데이터를 정상적으로 수신하는 모습을 볼 수 있다.
그리고 중간에 클라이언트를 종료 시키면 Recv 0이라는 로그와 함께 정상적으로 Session이 소멸되는 모습도 볼 수 있다.
하지만 만약 ProcessRecv()에서 레퍼런스 카운트 감소를 시켜주지 않는다면,
Recv 0 이라는 오류와 함께 Disconnect는 호출이 되지만 Session이 소멸되지 않는 모습을 볼 수 있다.(MemoryLeak 발생)
지금 코드의 전반적인 흐름이
클라이언트가 접속하면 우선 Listener의 Dispatch()가 호출되고,
session->ProcessConnect()를 호출할 것이다. (세션을 만들면서 이미 IOCP에 세션을 등록해놓음.)
RegisterRecv()를 하는 순간 WSARecv()를 호출하면서 완료 통보를 IocpCore를 통해 받게 된다.
그러면 IocpCore를 관찰하면서 완료통보를 기다리던 쓰레드들이
GetQueuedCompletionStatus() 함수를 통해 iocpEvent->owner(EventType::Recv)의 Dispatch()를 실행시켜준다.
그러면 ProcessRecv()가 호출이 되고,
ProcessRecv()는 다시 RegisterRecv()를 호출해줌으로써 반복적으로 수신을 할 수 있도록 해준다.
'네트워크 프로그래밍' 카테고리의 다른 글
Session (3) (0) | 2023.03.17 |
---|---|
Session (2) (0) | 2023.03.17 |
Server Service (0) | 2023.03.03 |
IocpCore (0) | 2023.02.26 |
Socket Utils (0) | 2023.02.26 |