You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
whellvalue/src/libtcp/common/tcpClientRecv.cpp

224 lines
5.5 KiB
C++

#include "tcpClientRecv.h"
tcpClientRecv::tcpClientRecv(const QString uuid, QObject *parent /*= 0*/)
: QObject(parent), is_closed_(false), should_recv_size_(0)
{
pDataRecvBuf = new char[65536];
uuid_ = uuid;
socket_ptr_ = new QTcpSocket(this);
connect(socket_ptr_, SIGNAL(connected()), this, SLOT(on_connected()), Qt::DirectConnection);
connect(socket_ptr_, SIGNAL(readyRead()), this, SLOT(on_ready_read()), Qt::DirectConnection);
connect(socket_ptr_, SIGNAL(bytesWritten(qint64)), this, SLOT(on_bytes_written(qint64)), Qt::DirectConnection);
connect(socket_ptr_, SIGNAL(disconnected()), this, SLOT(on_disconnected()), Qt::DirectConnection);
connect(socket_ptr_, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(on_socket_error(QAbstractSocket::SocketError)), Qt::DirectConnection);
connect(this, SIGNAL(sgStartSession(const QString, const QString&, qint16)), this, SLOT(on_session_start(const QString, const QString&, qint16)));
}
tcpClientRecv::~tcpClientRecv()
{
if (socket_ptr_) {
socket_ptr_->abort();
socket_ptr_->close();
delete socket_ptr_;
socket_ptr_ = nullptr;
}
if (trans_timeout_timer_) {
trans_timeout_timer_->stop();
delete trans_timeout_timer_;
trans_timeout_timer_ = nullptr;
}
if (connect_timeout_timer_) {
connect_timeout_timer_->stop();
delete connect_timeout_timer_;
connect_timeout_timer_ = nullptr;
}
delete[]pDataRecvBuf;
pDataRecvBuf = nullptr;
}
void tcpClientRecv::set_msg_body(const QJsonObject& msg_body)
{
std::lock_guard<std::mutex> guard(m);
current_recv_size_ = 0;
msg_body_ = msg_body;
should_recv_size_ = msg_body["data_size"].toInt(0);
memset(pDataRecvBuf, 0, 65536);
if (should_recv_size_ != 0)
{
m_pdataRecv.resize(should_recv_size_);
pDataRecv_ = m_pdataRecv.data();
memset(pDataRecv_, 0, should_recv_size_);
}
}
void tcpClientRecv::on_session_start(const QString uuid, const QString& ip_addr, qint16 port)
{
if (!connect_timeout_timer_)
{
connect_timeout_timer_ = new QTimer();
connect_timeout_timer_->setSingleShot(true);
connect_timeout_timer_->setInterval(1500);
connect(connect_timeout_timer_, SIGNAL(timeout()), this, SLOT(on_connect_timeout()));
}
if (!trans_timeout_timer_)
{
trans_timeout_timer_ = new QTimer();
trans_timeout_timer_->setSingleShot(true);
trans_timeout_timer_->setInterval(2000);
connect(trans_timeout_timer_, SIGNAL(timeout()), this, SLOT(on_transfer_timeout()));
}
if (uuid_ == uuid) {
if (connect_timeout_timer_)
{
connect_timeout_timer_->stop();
connect_timeout_timer_->start();
}
begin(ip_addr, port);
}
}
void tcpClientRecv::on_connected()
{
if (should_recv_size_ <= 0)
{
// qWarning() << __FUNCTION__ << endl
// << "Could not get the data size should be received. Should abort connection!";
}
data_received_.clear();
if (connect_timeout_timer_)
{
connect_timeout_timer_->stop();
}
if (trans_timeout_timer_)
{
//if (trans_timeout_timer_->isActive())
trans_timeout_timer_->stop();
trans_timeout_timer_->start();
}
}
void tcpClientRecv::on_ready_read()
{
std::lock_guard<std::mutex> guard(m);
int size = socket_ptr_->read(pDataRecvBuf, 65536);
if (pDataRecvBuf != nullptr && pDataRecv_ != nullptr)
{
if (current_recv_size_ >= should_recv_size_)
return;
char* dataPtr = pDataRecv_ + current_recv_size_;
if ((current_recv_size_ + size) > should_recv_size_)
{
size = should_recv_size_ - current_recv_size_;
}
if (dataPtr != nullptr)
{
current_recv_size_ += size;
memcpy(dataPtr, pDataRecvBuf, size);
}
}
if (trans_timeout_timer_)
{
trans_timeout_timer_->stop();
//trans_timeout_timer_->start();
}
}
void tcpClientRecv::on_bytes_written(qint64 len)
{
}
void tcpClientRecv::on_disconnected()
{
std::lock_guard<std::mutex> guard(m);
if (current_recv_size_ != should_recv_size_)
{
// qWarning() << __FUNCTION__ << endl
// << "Received incomplete data! Should receive: " << should_recv_size_
// << ". Actually received: " << data_received_.size();
}
else
{
QSharedPointer<QByteArray> pdata = QSharedPointer<QByteArray>(new QByteArray());
*pdata = m_pdataRecv;
QSharedPointer<QJsonObject> pimginfo = QSharedPointer<QJsonObject>(new QJsonObject(msg_body_));
emit signal_data_recv_completed(uuid_, pdata, pimginfo);
}
//close();
//qDebug() << __FUNCTION__;
}
void tcpClientRecv::on_socket_error(QAbstractSocket::SocketError err)
{
QString err_str;
switch (err)
{
case QAbstractSocket::AddressInUseError:
err_str = "SOCKET ERROR: Address is already in use";
break;
case QAbstractSocket::ConnectionRefusedError:
err_str = "SOCKET ERROR: Connection refused";
break;
case QAbstractSocket::HostNotFoundError:
err_str = "SOCKET ERROR: Host not found";
break;
case QAbstractSocket::RemoteHostClosedError:
err_str = "SOCKET ERROR: Remote host closed";
break;
}
if (!err_str.isEmpty()) {
//qCritical() << __FUNCTION__ << endl
// << err_str;
}
close();
}
void tcpClientRecv::begin(const QString& ip_addr, qint16 port)
{
if (!socket_ptr_) {
return;
}
socket_ptr_->connectToHost(ip_addr, port);
}
void tcpClientRecv::close()
{
if (socket_ptr_) {
socket_ptr_->abort();
socket_ptr_->close();
}
if (trans_timeout_timer_) {
trans_timeout_timer_->stop();
}
if (connect_timeout_timer_) {
connect_timeout_timer_->stop();
}
emit sgException(uuid_);
}
Q_SLOT void tcpClientRecv::on_connect_timeout()
{
//qWarning() << __FUNCTION__;
//qDebug() << "time out listen ,port:" << curPort_;
close();
}
Q_SLOT void tcpClientRecv::on_transfer_timeout()
{
//qWarning() << __FUNCTION__;
close();
}
/*
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ͽ<EFBFBD>ʱ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֱ<EFBFBD><EFBFBD><EFBFBD> <EFBFBD>
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ر<EFBFBD>ʱ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/