大家好,又见面了,我是你们的朋友全栈君。
BlockQueue.h
代码语言:javascript复制#pragma once
#include "Common.h"
#include "Condition.h"
template<typename T>
class BlockQueue
{
public:
BlockQueue()
: mutex_()
, notEmpty_(mutex_)
{
}
void put(const T &t)
{
MutexLockGuard lock(mutex_);
queue_.push_back(t);
notEmpty_.notify();
}
T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty()) {
notEmpty_.wait();
}
assert(!queue_.empty());
T front(queue_.front());
queue_.pop_front();
return front;
}
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
private:
BlockQueue(const BlockQueue&);
BlockQueue& operator=(const BlockQueue&);
private:
mutable MutexLock mutex_;
Condition notEmpty_;
std::deque<T> queue_;
};
Buffer.h
代码语言:javascript复制#pragma once
#include "Common.h"
class Buffer;
typedef std::tr1::shared_ptr<Buffer> BufferPtr;
class Buffer
{
public:
static const size_t kCheapPrepend = 8;
static const size_t kInitialSize = 2048;
Buffer()
: buffer_(kCheapPrepend kInitialSize)
, readerIndex_(kCheapPrepend)
, writerIndex_(kCheapPrepend)
{
assert(readableBytes() == 0);
assert(writableBytes() == kInitialSize);
assert(prependableBytes() == kCheapPrepend);
}
void swap(Buffer &rhs)
{
buffer_.swap(rhs.buffer_);
std::swap(readerIndex_, rhs.readerIndex_);
std::swap(writerIndex_, rhs.writerIndex_);
}
size_t readableBytes() const
{
return writerIndex_ - readerIndex_;
}
size_t writableBytes() const
{
return buffer_.size() - writerIndex_;
}
size_t prependableBytes() const
{
return readerIndex_;
}
const char *peek() const
{
return begin() readerIndex_;
}
void retrieve(size_t len)
{
assert(len <= readableBytes());
if ( len < readableBytes())
{
readerIndex_ = len;
}
else
{
retrieveAll();
}
}
void retrieveUntil(const char *end)
{
assert(peek() <= end);
assert(end <= beginWrite());
retrieve(end - peek());
}
void retrieveInt32()
{
retrieve(sizeof(__int32));
}
void retrieveInt16()
{
retrieve(sizeof(__int16));
}
void retrieveInt8()
{
retrieve(sizeof(__int8));
}
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
std::string retrieveAllAsString()
{
return retrieveAsString(readableBytes());
}
std::string retrieveAsString(size_t len)
{
assert(len <= readableBytes());
std::string result(peek(), len);
retrieve(len);
return result;
}
void append(const std::string &str)
{
append(str.c_str(), str.size());
}
void append(const char* data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data len, stdext::checked_array_iterator<char*>(beginWrite(), len));
hasWritten(len);
}
void append(const void* data, size_t len)
{
append(static_cast<const char*>(data), len);
}
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len);
}
assert(writableBytes() >= len);
}
char* beginWrite()
{
return begin() writerIndex_;
}
const char* beginWrite() const
{
return begin() writerIndex_;
}
void hasWritten(size_t len)
{
writerIndex_ = len;
}
void appendInt32(__int32 x);
void appendInt16(__int16 x);
void appendInt8(__int8 x);
__int32 readInt32();
__int16 readInt16();
__int8 readInt8();
__int32 peekInt32() const;
__int16 peekInt16() const;
__int8 peekInt8() const;
void prependInt32(__int32 x);
void prependInt16(__int16 x);
void prependInt8(__int8 x);
bool Buffer::read(void* data, size_t len)
{
if (data && readableBytes() >= len)
{
memcpy(data, peek(), len);
retrieve(len);
return true;
}
return false;
}
void prepend(const void* data, size_t len)
{
assert(len <= prependableBytes());
readerIndex_ -= len;
const char *d = static_cast<const char*>(data);
std::copy(d, d len, stdext::checked_array_iterator<char*>(begin() readerIndex_, len));
}
int readFd(void* fd, int *saveErrno);
private:
char *begin()
{
return &*buffer_.begin();
}
const char* begin() const
{
return &*buffer_.begin();
}
void makeSpace(size_t len)
{
if (writableBytes() prependableBytes() < len kCheapPrepend)
{
buffer_.resize(writerIndex_ len);
}
else
{
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
std::copy(begin() readerIndex_, begin() writerIndex_,
stdext::checked_array_iterator<char*>(begin() kCheapPrepend, writerIndex_ - readerIndex_));
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ readable;
assert(readable == readableBytes());
}
}
Buffer(const Buffer&);
const Buffer& operator=(const Buffer&);
private:
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
};
Channel.h
代码语言:javascript复制#pragma once
#include "Buffer.h"
#include "BlockQueue.h"
#define TCPCONN_SUCCESS 0
#define TCPCONN_ERROR_BASE 0x00100
#define TCPCONN_ERROR_SOCKET TCPCONN_ERROR_BASE 1
#define TCPCONN_ERROR_CONNECT TCPCONN_ERROR_BASE 2
#define TCPCONN_ERROR_SEND TCPCONN_ERROR_BASE 3
#define TCPCONN_ERROR_RECV TCPCONN_ERROR_BASE 4
#define TCPCONN_ERROR_WSAEVENTSELECT TCPCONN_ERROR_BASE 5
#define TCPCONN_ERROR_WSAENUMNETWORKEVENTS TCPCONN_ERROR_BASE 6
#define TCPCONN_ERROR_WSAWAITFORMULTIPLEEVENTS TCPCONN_ERROR_BASE 7
typedef std::tr1::function<void(Buffer&)> OnReadCallback;
typedef std::tr1::function<void(int)> OnErrorCallback;
typedef std::tr1::function<void()> OnCloseCallback;
class Connector;
class Thread;
template<typename T> class BlockQueue;
typedef std::tr1::shared_ptr<Connector> ConnectorPtr;
class Channel
{
public:
Channel(ConnectorPtr connectorPtr);
~Channel();
void start();
void stop();
void send(const unsigned char *msg, size_t len);
void send(const BufferPtr &msg);
void setOnReadCallback(const OnReadCallback &cb) { onReadCallback_ = cb; }
void setOnErrorCallback(const OnErrorCallback &cb) { onErrorCallback_ = cb; }
void setOnCloseCallback(const OnCloseCallback &cb) { onCloseCallback_ = cb; }
private:
Channel(const Channel&);
Channel& operator=(const Channel&);
void sendBufferThread();
void recvBufferThread();
private:
ConnectorPtr connectorPtr_;
scoped_ptr<Thread> recvBufferThread_;
scoped_ptr<Thread> sendBufferThread_;
scoped_ptr<BlockQueue<BufferPtr> > sendQueue_;
Buffer recvBuffer_;
OnReadCallback onReadCallback_;
OnErrorCallback onErrorCallback_;
OnCloseCallback onCloseCallback_;
};
Common.h
代码语言:javascript复制#pragma once
#include <assert.h>
#include <stdlib.h>
#include <cstddef>
#include <string.h>
#include <memory>
#include <functional>
#include <string>
#include <deque>
#include <algorithm>
#include <vector>
#include <map>
#include <iterator>
#include <WinSock2.h>
#include <Windows.h>
#include <iostream> // for test
namespace internal {
// This is an implementation designed to match the anticipated future TR2
// implementation of the scoped_ptr class, and its closely-related brethren,
// scoped_array, scoped_ptr_malloc, and make_scoped_ptr.
template <class C> class scoped_ptr;
template <class C> class scoped_array;
// A scoped_ptr<T> is like a T*, except that the destructor of scoped_ptr<T>
// automatically deletes the pointer it holds (if any).
// That is, scoped_ptr<T> owns the T object that it points to.
// Like a T*, a scoped_ptr<T> may hold either NULL or a pointer to a T object.
//
// The size of a scoped_ptr is small:
// sizeof(scoped_ptr<C>) == sizeof(C*)
template <class C>
class scoped_ptr {
public:
// The element type
typedef C element_type;
// Constructor. Defaults to intializing with NULL.
// There is no way to create an uninitialized scoped_ptr.
// The input parameter must be allocated with new.
explicit scoped_ptr(C* p = NULL) : ptr_(p) { }
// Destructor. If there is a C object, delete it.
// We don't need to test ptr_ == NULL because C does that for us.
~scoped_ptr() {
enum { type_must_be_complete = sizeof(C) };
delete ptr_;
}
// Reset. Deletes the current owned object, if any.
// Then takes ownership of a new object, if given.
// this->reset(this->get()) works.
void reset(C* p = NULL) {
if (p != ptr_) {
enum { type_must_be_complete = sizeof(C) };
delete ptr_;
ptr_ = p;
}
}
// Accessors to get the owned object.
// operator* and operator-> will assert() if there is no current object.
C& operator*() const {
assert(ptr_ != NULL);
return *ptr_;
}
C* operator->() const {
assert(ptr_ != NULL);
return ptr_;
}
C* get() const { return ptr_; }
// Comparison operators.
// These return whether two scoped_ptr refer to the same object, not just to
// two different but equal objects.
bool operator==(C* p) const { return ptr_ == p; }
bool operator!=(C* p) const { return ptr_ != p; }
// Swap two scoped pointers.
void swap(scoped_ptr& p2) {
C* tmp = ptr_;
ptr_ = p2.ptr_;
p2.ptr_ = tmp;
}
// Release a pointer.
// The return value is the current pointer held by this object.
// If this object holds a NULL pointer, the return value is NULL.
// After this operation, this object will hold a NULL pointer,
// and will not own the object any more.
C* release() {
C* retVal = ptr_;
ptr_ = NULL;
return retVal;
}
private:
C* ptr_;
// Forbid comparison of scoped_ptr types. If C2 != C, it totally doesn't
// make sense, and if C2 == C, it still doesn't make sense because you should
// never have the same object owned by two different scoped_ptrs.
template <class C2> bool operator==(scoped_ptr<C2> const& p2) const;
template <class C2> bool operator!=(scoped_ptr<C2> const& p2) const;
// Disallow evil constructors
scoped_ptr(const scoped_ptr&);
void operator=(const scoped_ptr&);
};
// scoped_array<C> is like scoped_ptr<C>, except that the caller must allocate
// with new [] and the destructor deletes objects with delete [].
//
// As with scoped_ptr<C>, a scoped_array<C> either points to an object
// or is NULL. A scoped_array<C> owns the object that it points to.
//
// Size: sizeof(scoped_array<C>) == sizeof(C*)
template <class C>
class scoped_array {
public:
// The element type
typedef C element_type;
// Constructor. Defaults to intializing with NULL.
// There is no way to create an uninitialized scoped_array.
// The input parameter must be allocated with new [].
explicit scoped_array(C* p = NULL) : array_(p) { }
// Destructor. If there is a C object, delete it.
// We don't need to test ptr_ == NULL because C does that for us.
~scoped_array() {
enum { type_must_be_complete = sizeof(C) };
delete[] array_;
}
// Reset. Deletes the current owned object, if any.
// Then takes ownership of a new object, if given.
// this->reset(this->get()) works.
void reset(C* p = NULL) {
if (p != array_) {
enum { type_must_be_complete = sizeof(C) };
delete[] array_;
array_ = p;
}
}
// Get one element of the current object.
// Will assert() if there is no current object, or index i is negative.
C& operator[](std::ptrdiff_t i) const {
assert(i >= 0);
assert(array_ != NULL);
return array_[i];
}
// Get a pointer to the zeroth element of the current object.
// If there is no current object, return NULL.
C* get() const {
return array_;
}
// Comparison operators.
// These return whether two scoped_array refer to the same object, not just to
// two different but equal objects.
bool operator==(C* p) const { return array_ == p; }
bool operator!=(C* p) const { return array_ != p; }
// Swap two scoped arrays.
void swap(scoped_array& p2) {
C* tmp = array_;
array_ = p2.array_;
p2.array_ = tmp;
}
// Release an array.
// The return value is the current pointer held by this object.
// If this object holds a NULL pointer, the return value is NULL.
// After this operation, this object will hold a NULL pointer,
// and will not own the object any more.
C* release() {
C* retVal = array_;
array_ = NULL;
return retVal;
}
private:
C* array_;
// Forbid comparison of different scoped_array types.
template <class C2> bool operator==(scoped_array<C2> const& p2) const;
template <class C2> bool operator!=(scoped_array<C2> const& p2) const;
// Disallow evil constructors
scoped_array(const scoped_array&);
void operator=(const scoped_array&);
};
} // namespace internal
// We made these internal so that they would show up as such in the docs,
// but we don't want to stick "internal::" in front of them everywhere.
using internal::scoped_ptr;
using internal::scoped_array;
Condition.h
代码语言:javascript复制#pragma once
#include "MutexLock.h"
class Condition
{
public:
explicit Condition(MutexLock &mutex)
: mutex_(mutex)
{
InitializeConditionVariable(&cond_);
}
void wait()
{
SleepConditionVariableCS(&cond_, &mutex_.cs(), INFINITE);
}
void notify()
{
WakeConditionVariable(&cond_);
}
private:
Condition(const Condition&);
Condition& operator=(const Condition&);
private:
MutexLock &mutex_;
CONDITION_VARIABLE cond_;
};
Connector.h
代码语言:javascript复制#pragma once
#include "Common.h"
class Connector
{
public:
typedef std::tr1::function<void(SOCKET fd)> ConnectionCallback;
Connector(const std::string &ip, unsigned short port);
virtual ~Connector(void);
virtual void start();
virtual void stop();
void setConnectionCallback(const ConnectionCallback &func) { func_ = func; }
SOCKET socket() const { return sock_; }
bool connected() const { return connected_; }
private:
void connect();
private:
std::string ip_;
unsigned short port_;
ConnectionCallback func_;
SOCKET sock_;
bool connected_;
};
MutexLock.h
代码语言:javascript复制#pragma once
#include <WinBase.h>
class MutexLock
{
public:
MutexLock()
{
InitializeCriticalSection(&criticalSection_);
}
~MutexLock()
{
DeleteCriticalSection(&criticalSection_);
}
void lock()
{
EnterCriticalSection(&criticalSection_);
}
void unlock()
{
LeaveCriticalSection(&criticalSection_);
}
CRITICAL_SECTION& cs()
{
return criticalSection_;
}
private:
MutexLock(const MutexLock&);
MutexLock& operator=(const MutexLock&);
CRITICAL_SECTION criticalSection_;
};
class MutexLockGuard
{
public:
explicit MutexLockGuard(MutexLock &mutex)
: mutex_(mutex)
{
mutex_.lock();
}
~MutexLockGuard()
{
mutex_.unlock();
}
private:
MutexLockGuard(const MutexLockGuard&);
MutexLockGuard operator=(const MutexLockGuard&);
MutexLock& mutex_;
};
Thread.h
代码语言:javascript复制#pragma once
#include "Common.h"
class Thread
{
public:
typedef std::tr1::function<void()> ThreadFunc;
explicit Thread(const ThreadFunc&, const std::string &name=std::string());
~Thread();
void start();
int join();
bool started() const { return started_; }
HANDLE handle() const { return handle_; }
unsigned int tid() const { return tid_; }
const std::string& name() const { return name_; }
private:
static unsigned int WINAPI startThread(void *data);
void runInThread();
private:
HANDLE handle_;
unsigned int tid_;
bool started_;
ThreadFunc func_;
std::string name_;
};
WSAStartup.h
代码语言:javascript复制#pragma once
class WsaStartup
{
public:
WsaStartup(unsigned char majorVer, unsigned char minorVer);
virtual ~WsaStartup(void);
};
Buffer.cpp
代码语言:javascript复制#include "Buffer.h"
#include <WinSock2.h>
void Buffer::appendInt32(__int32 x)
{
__int32 be32 = htonl(x);
append(&be32, sizeof(be32));
}
void Buffer::appendInt16(__int16 x)
{
__int16 be16 = htons(x);
append(&be16, sizeof(be16));
}
void Buffer::appendInt8(__int8 x)
{
append(&x, sizeof(x));
}
__int32 Buffer::readInt32()
{
__int32 result = peekInt32();
retrieveInt32();
return result;
}
__int16 Buffer::readInt16()
{
__int16 result = peekInt16();
retrieveInt16();
return result;
}
__int8 Buffer::readInt8()
{
__int8 result = peekInt8();
retrieveInt8();
return result;
}
__int32 Buffer::peekInt32() const
{
assert(readableBytes() >= sizeof(__int32));
__int32 be32 = 0;
::memcpy(&be32, peek(), sizeof(be32));
return ntohl(be32);
}
__int16 Buffer::peekInt16() const
{
assert(readableBytes() >= sizeof(__int16));
__int16 be16 = 0;
::memcpy(&be16, peek(), sizeof(be16));
return htons(be16);
}
__int8 Buffer::peekInt8() const
{
assert(readableBytes() >= sizeof(__int8));
__int8 x = *peek();
return x;
}
void Buffer::prependInt32(__int32 x)
{
__int32 be32 = htonl(x);
prepend(&be32, sizeof(be32));
}
void Buffer::prependInt16(__int16 x)
{
__int16 be16 = htons(x);
prepend(&be16, sizeof(be16));
}
void Buffer::prependInt8(__int8 x)
{
prepend(&x, sizeof(x));
}
int Buffer::readFd(void* fd, int *saveErrno)
{
SOCKET sock = (SOCKET)fd;
int writable = writableBytes();
int n = ::recv(sock, begin() writerIndex_, writable, 0);
if (SOCKET_ERROR == n)
{
*saveErrno = WSAGetLastError();
}
else if (n <= writable)
{
writerIndex_ = n;
}
return n;
}
Channel.cpp
代码语言:javascript复制#include "Channel.h"
#include "Thread.h"
#include "Connector.h"
Channel::Channel(ConnectorPtr connectorPtr)
: connectorPtr_(connectorPtr)
, sendQueue_(new BlockQueue<BufferPtr>())
{
recvBufferThread_.reset(new Thread(std::tr1::bind(&Channel::recvBufferThread, this), "eventHandleThread"));
sendBufferThread_.reset(new Thread(std::tr1::bind(&Channel::sendBufferThread, this), "sendBufferThread"));
}
Channel::~Channel(void)
{
}
void Channel::start()
{
connectorPtr_->start();
recvBufferThread_->start();
sendBufferThread_->start();
}
void Channel::stop()
{
connectorPtr_->stop();
BufferPtr lastBufferPtr(new Buffer());
sendQueue_->put(lastBufferPtr);
recvBufferThread_->join();
sendBufferThread_->join();
sendQueue_.reset(new BlockQueue<BufferPtr>());
}
void Channel::send(const unsigned char *msg, size_t len)
{
BufferPtr bufferPtr(new Buffer());
bufferPtr->append(msg, len);
sendQueue_->put(bufferPtr);
}
void Channel::send(const BufferPtr &msg)
{
sendQueue_->put(msg);
}
void Channel::sendBufferThread()
{
std::cout << "recvBufferThread" << std::endl;
while (connectorPtr_->connected()) {
BufferPtr bufferPtr = sendQueue_->take();
if (!connectorPtr_->connected()) {
break;
}
unsigned int left = bufferPtr->readableBytes();
while (left > 0) {
int sendNum = ::send(connectorPtr_->socket(), bufferPtr->peek(), left, 0);
if (SOCKET_ERROR == sendNum) {
onErrorCallback_(TCPCONN_ERROR_SEND);
break;
}
left -= sendNum;
bufferPtr->retrieve(sendNum);
}
}
std::cout << "recvBufferThread exit" << std::endl;
}
void Channel::recvBufferThread()
{
std::cout << "sendBufferThread" << std::endl;
while (connectorPtr_->connected()) {
static const int bufferLen = 65535;;
char buffer[bufferLen];
int recvNum = ::recv(connectorPtr_->socket(), buffer, bufferLen, 0);
if (recvNum > 0) {
recvBuffer_.append(buffer, recvNum);
onReadCallback_(recvBuffer_);
} else if (0 == recvNum) {
onCloseCallback_();
break;
} else {
onErrorCallback_(TCPCONN_ERROR_RECV);
break;
}
}
std::cout << "sendBufferThread exit" << std::endl;
}
Connector.cpp
代码语言:javascript复制#include "Connector.h"
#include <Windows.h>
Connector::Connector(const std::string &ip, unsigned short port)
: ip_(ip)
, port_(port)
, func_(NULL)
, sock_(INVALID_SOCKET)
, connected_(false)
{
}
Connector::~Connector(void)
{
stop();
}
void Connector::start()
{
connect();
}
void Connector::stop()
{
connected_ = false;
if (INVALID_SOCKET != sock_) {
::shutdown(sock_, 2); // FIXME
::closesocket(sock_);
sock_ = INVALID_SOCKET;
}
}
void Connector::connect()
{
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = inet_addr(ip_.c_str());
sin.sin_port = htons(port_);
sock_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (INVALID_SOCKET != sock_) {
if (0 == ::connect(sock_, (SOCKADDR*)&sin, sizeof(sin))) {
connected_ = true;
} else {
::closesocket(sock_);
sock_ = INVALID_SOCKET;
}
}
}
Thread.cpp
代码语言:javascript复制#include "Thread.h"
#include <process.h>
#include <assert.h>
Thread::Thread(const ThreadFunc &func, const std::string &name)
: handle_(NULL)
, tid_(0)
, started_(false)
, func_(func)
, name_(name)
{
}
Thread::~Thread()
{
CloseHandle(handle_);
}
void Thread::start()
{
started_ = true;
handle_ = (HANDLE)_beginthreadex(NULL, 0, startThread, this, 0, &tid_);
assert(handle_ > 0);
}
int Thread::join()
{
if (GetCurrentThreadId() == tid_) {
return 0;
}
return WaitForSingleObject(handle_, INFINITE);
}
unsigned int Thread::startThread(void *data)
{
Thread *thread = static_cast<Thread*>(data);
thread->runInThread();
return 0;
}
void Thread::runInThread()
{
assert(GetCurrentThreadId() == tid_);
if (func_) {
func_();
}
started_ = false;
}
WsaStartup.cpp
代码语言:javascript复制#include "WsaStartup.h"
#include <Windows.h>
#include <crtdbg.h>
WsaStartup::WsaStartup(unsigned char majorVer, unsigned char minorVer)
{
WSAData wsaData;
_ASSERT(0 == WSAStartup(MAKEWORD(majorVer, minorVer), &wsaData));
}
WsaStartup::~WsaStartup(void)
{
WSACleanup();
}
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/158351.html原文链接:https://javaforall.cn