diff options
Diffstat (limited to 'httpd/job.cpp')
-rw-r--r-- | httpd/job.cpp | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/httpd/job.cpp b/httpd/job.cpp new file mode 100644 index 0000000..49ecba6 --- /dev/null +++ b/httpd/job.cpp @@ -0,0 +1,240 @@ +/* job.cpp + * Copyright (C) 2003-2005 Tommi Maekitalo + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * is provided AS IS, WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, and + * NON-INFRINGEMENT. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#include "tnt/job.h" +#include <tnt/httpreply.h> +#include <cxxtools/log.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> + +log_define("tntnet.job") + +namespace tnt +{ + unsigned Job::socket_read_timeout = 200; + unsigned Job::socket_write_timeout = 10000; + unsigned Job::keepalive_max = 1000; + unsigned Job::socket_buffer_size = 16384; + + Job::~Job() + { } + + void Job::clear() + { + parser.reset(); + request.clear(); + touch(); + } + + int Job::msecToTimeout(time_t currentTime) const + { + return (lastAccessTime - currentTime + 1) * 1000 + + getKeepAliveTimeout() + - getSocketReadTimeout(); + } + + unsigned Job::getKeepAliveTimeout() + { + return HttpReply::getKeepAliveTimeout(); + } + + //////////////////////////////////////////////////////////////////////// + // Tcpjob + // + void Tcpjob::accept(const cxxtools::net::Server& listener) + { + log_debug("accept"); + socket.accept(listener); + + struct sockaddr_storage s = socket.getSockAddr(); + struct sockaddr_storage sockaddr; + memcpy(&sockaddr, &s, sizeof(sockaddr)); + + char buffer[INET6_ADDRSTRLEN]; + log_debug("connection accepted from " + << inet_ntop(AF_INET6, &(socket.getPeeraddr()), buffer, sizeof(buffer))); + + getRequest().setPeerAddr(socket.getPeeraddr()); + getRequest().setServerAddr(sockaddr); + getRequest().setSsl(false); + } + + std::iostream& Tcpjob::getStream() + { + return socket; + } + + int Tcpjob::getFd() const + { + return socket.getFd(); + } + + void Tcpjob::setRead() + { + socket.setTimeout(getSocketReadTimeout()); + } + + void Tcpjob::setWrite() + { + socket.setTimeout(getSocketWriteTimeout()); + } + +#ifdef USE_SSL + //////////////////////////////////////////////////////////////////////// + // SslTcpjob + // + void SslTcpjob::accept(const SslServer& listener) + { + log_debug("accept (ssl)"); + socket.accept(listener); + log_debug("connection accepted (ssl)"); + + struct sockaddr_storage s = socket.getSockAddr(); + struct sockaddr_storage sockaddr; + memcpy(&sockaddr, &s, sizeof(sockaddr)); + + getRequest().setPeerAddr(socket.getPeeraddr()); + getRequest().setServerAddr(sockaddr); + getRequest().setSsl(true); + + setRead(); + } + + std::iostream& SslTcpjob::getStream() + { + return socket; + } + + int SslTcpjob::getFd() const + { + return socket.getFd(); + } + + void SslTcpjob::setRead() + { + socket.setTimeout(getSocketReadTimeout()); + } + + void SslTcpjob::setWrite() + { + socket.setTimeout(getSocketWriteTimeout()); + } + +#endif // USE_SSL + +#ifdef USE_GNUTLS + //////////////////////////////////////////////////////////////////////// + // GnuTlsTcpjob + // + void GnuTlsTcpjob::accept(const GnuTlsServer& listener) + { + log_debug("accept (ssl)"); + socket.accept(listener); + log_debug("connection accepted (ssl)"); + + struct sockaddr_storage s = socket.getSockAddr(); + struct sockaddr_storage sockaddr; + memcpy(&sockaddr, &s, sizeof(sockaddr)); + + getRequest().setPeerAddr(socket.getPeeraddr()); + getRequest().setServerAddr(sockaddr); + getRequest().setSsl(true); + + setRead(); + } + + std::iostream& GnuTlsTcpjob::getStream() + { + return socket; + } + + int GnuTlsTcpjob::getFd() const + { + return socket.getFd(); + } + + void GnuTlsTcpjob::setRead() + { + socket.setTimeout(getSocketReadTimeout()); + } + + void GnuTlsTcpjob::setWrite() + { + socket.setTimeout(getSocketWriteTimeout()); + } + +#endif // USE_GNUTLS + + ////////////////////////////////////////////////////////////////////// + // Jobqueue + // + void Jobqueue::put(JobPtr j) + { + log_debug("Jobqueue::put"); + j->touch(); + + cxxtools::MutexLock lock(mutex); + + if (capacity > 0) + { + while (jobs.size() >= capacity) + { + log_warn("Jobqueue full"); + notFull.wait(lock); + } + } + + jobs.push_back(j); + + if (waitThreads == 0) + { + log_info("no waiting threads left"); + noWaitThreads.signal(); + } + + notEmpty.signal(); + } + + Jobqueue::JobPtr Jobqueue::get() + { + // wait, until a job is available + ++waitThreads; + + cxxtools::MutexLock lock(mutex); + while (jobs.empty()) + notEmpty.wait(lock); + + --waitThreads; + + log_debug("Jobqueue: fetch job " << waitThreads << " waiting threads left"); + + // take next job (queue is locked) + JobPtr j = jobs.front(); + jobs.pop_front(); + + // if there are more jobs, wake onther thread + if (!jobs.empty()) + notEmpty.signal(); + notFull.signal(); + + return j; + } + +} |