summaryrefslogtreecommitdiff
path: root/httpd/worker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'httpd/worker.cpp')
-rw-r--r--httpd/worker.cpp364
1 files changed, 0 insertions, 364 deletions
diff --git a/httpd/worker.cpp b/httpd/worker.cpp
deleted file mode 100644
index 7552db9..0000000
--- a/httpd/worker.cpp
+++ /dev/null
@@ -1,364 +0,0 @@
-/* worker.cpp
- * Copyright (C) 2003-2005 Tommi Maekitalo
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License as
- * published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * is provided AS IS, WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, and
- * NON-INFRINGEMENT. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- */
-
-#include "tnt/worker.h"
-#include "tnt/dispatcher.h"
-#include "tnt/job.h"
-#include <tnt/httprequest.h>
-#include <tnt/httpreply.h>
-#include <tnt/httperror.h>
-#include <tnt/http.h>
-#include <tnt/poller.h>
-#include <cxxtools/log.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <locale>
-#include <errno.h>
-
-log_define("tntnet.worker")
-
-namespace
-{
- static const char stateStarting[] = "0 starting";
- static const char stateWaitingForJob[] = "1 waiting for job";
- static const char stateParsing[] = "2 parsing request";
- static const char statePostParsing[] = "3 post parsing";
- static const char stateDispatch[] = "4 dispatch";
- static const char stateProcessingRequest[] = "5 processing request";
- static const char stateFlush[] = "6 flush";
- static const char stateSendReply[] = "7 send reply";
- static const char stateSendError[] = "8 send error";
- static const char stateStopping[] = "9 stopping";
-}
-
-namespace tnt
-{
- cxxtools::Mutex Worker::mutex;
- unsigned Worker::nextThreadNumber = 0;
- Worker::workers_type Worker::workers;
- unsigned Worker::maxRequestTime = 600;
- unsigned Worker::minThreads = 5;
- bool Worker::enableCompression = true;
-
- Worker::ComploaderPoolType Worker::comploaderPool;
-
- Worker::Worker(Tntnet& app)
- : application(app),
- comploaderObject(comploaderPool.get()),
- comploader(comploaderObject),
- threadId(0),
- state(stateStarting),
- lastWaitTime(0)
- {
- log_debug("initialize thread " << threadId);
-
- cxxtools::MutexLock lock(mutex);
- workers.insert(this);
- }
-
- Worker::~Worker()
- {
- cxxtools::MutexLock lock(mutex);
- workers.erase(this);
-
- log_debug("delete worker " << threadId << " - " << workers.size() << " threads left - " << application.getQueue().getWaitThreadCount() << " waiting threads");
- }
-
- void Worker::run()
- {
- threadId = pthread_self();
- Jobqueue& queue = application.getQueue();
- log_debug("start thread " << threadId);
- while (queue.getWaitThreadCount() < minThreads)
- {
- log_debug("waiting for job");
- state = stateWaitingForJob;
- Jobqueue::JobPtr j = queue.get();
- if (Tntnet::shouldStop())
- {
- log_warn("stop worker");
- break;
- }
- log_debug("got job - fd=" << j->getFd());
-
- std::iostream& socket = j->getStream();
-
- try
- {
- bool keepAlive;
- do
- {
- time(&lastWaitTime);
-
- log_debug("read request");
-
- keepAlive = false;
- state = stateParsing;
- j->getParser().parse(socket);
- state = statePostParsing;
-
- if (socket.eof())
- log_debug("eof");
- else if (j->getParser().failed())
- {
- state = stateSendError;
- log_warn("bad request");
- socket << "HTTP/1.0 500 bad request\r\n"
- "Content-Type: text/html\r\n"
- "\r\n"
- "<html><body><h1>Error</h1><p>bad request</p></body></html>"
- << std::endl;
- }
- else if (socket.fail())
- log_error("socket failed");
- else
- {
- j->getRequest().doPostParse();
-
- j->setWrite();
- keepAlive = processRequest(j->getRequest(), socket,
- j->decrementKeepAliveCounter());
-
- if (keepAlive)
- {
- j->setRead();
- j->clear();
-
- // if there is something to do and no threads waiting, we take
- // the next job just to improve resposiveness.
- if (queue.getWaitThreadCount() == 0
- && !queue.empty())
- {
- application.getPoller().addIdleJob(j);
- keepAlive = false;
- }
- else
- {
- struct pollfd fd;
- fd.fd = j->getFd();
- fd.events = POLLIN;
- if (::poll(&fd, 1, Job::getSocketReadTimeout()) == 0)
- {
- application.getPoller().addIdleJob(j);
- keepAlive = false;
- }
- }
- }
- }
- } while (keepAlive);
- }
- catch (const cxxtools::net::Timeout& e)
- {
- log_debug("timeout - put job in poller");
- application.getPoller().addIdleJob(j);
- }
- catch (const cxxtools::net::Exception& e)
- {
- if (e.getErrno() != ENOENT)
- log_warn("unexpected exception: " << e.what());
- }
- catch (const std::exception& e)
- {
- log_warn("unexpected exception: " << e.what());
- }
- }
-
- time(&lastWaitTime);
-
- log_debug("end worker-thread " << threadId);
-
- state = stateStopping;
- }
-
- bool Worker::processRequest(HttpRequest& request, std::iostream& socket,
- unsigned keepAliveCount)
- {
- // log message
- log_info("process request: " << request.getMethod() << ' ' << request.getQuery()
- << " from client " << request.getPeerIp() << " user-Agent \"" << request.getUserAgent()
- << '"');
-
- // create reply-object
- HttpReply reply(socket);
- reply.setVersion(request.getMajorVersion(), request.getMinorVersion());
- reply.setMethod(request.getMethod());
-
- std::locale loc = request.getLocale();
- reply.out().imbue(loc);
- reply.sout().imbue(loc);
-
- if (request.keepAlive())
- reply.setKeepAliveCounter(keepAliveCount);
-
- if (enableCompression)
- reply.setAcceptEncoding(request.getEncoding());
-
- // process request
- try
- {
- try
- {
- dispatch(request, reply);
-
- if (!request.keepAlive() || !reply.keepAlive())
- keepAliveCount = 0;
-
- if (keepAliveCount > 0)
- log_debug("keep alive");
- else
- {
- log_debug("no keep alive request/reply="
- << request.keepAlive() << '/' << reply.keepAlive());
- }
- }
- catch (const HttpError& e)
- {
- throw;
- }
- catch (const std::exception& e)
- {
- throw HttpError(HTTP_INTERNAL_SERVER_ERROR, e.what());
- }
- }
- catch (const HttpError& e)
- {
- state = stateSendError;
- log_warn("http-Error: " << e.what());
- HttpReply reply(socket);
- reply.setVersion(request.getMajorVersion(), request.getMinorVersion());
- if (request.keepAlive())
- reply.setKeepAliveCounter(keepAliveCount);
- else
- keepAliveCount = 0;
- reply.out() << "<html><body><h1>Error</h1><p>"
- << e.what() << "</p></body></html>" << std::endl;
- reply.sendReply(e.getErrcode(), e.getErrmsg());
- }
-
- return keepAliveCount > 0;
- }
-
- void Worker::dispatch(HttpRequest& request, HttpReply& reply)
- {
- state = stateDispatch;
- const std::string& url = request.getUrl();
-
- log_debug("dispatch " << request.getQuery());
-
- if (!HttpRequest::checkUrl(url))
- throw HttpError(HTTP_BAD_REQUEST, "illegal url");
-
- request.setThreadScope(threadScope);
-
- Dispatcher::PosType pos(application.getDispatcher(), request.getUrl());
- while (true)
- {
- state = stateDispatch;
-
- // pos.getNext() throws NotFoundException at end
- Dispatcher::CompidentType ci = pos.getNext();
- try
- {
- log_debug("load component " << ci);
- Component& comp = comploader.fetchComp(ci, application.getDispatcher());
- request.setPathInfo(ci.hasPathInfo() ? ci.getPathInfo() : url);
- request.setArgs(ci.getArgs());
-
- application.getScopemanager().preCall(request, ci.libname);
-
- log_debug("call component " << ci << " path " << request.getPathInfo());
- state = stateProcessingRequest;
- unsigned http_return = comp(request, reply, request.getQueryParams());
- if (http_return != DECLINED)
- {
- if (reply.isDirectMode())
- {
- log_info("request ready, returncode " << http_return);
- state = stateFlush;
- reply.out().flush();
- }
- else
- {
- log_info("request ready, returncode " << http_return << " - ContentSize: " << reply.getContentSize());
-
- application.getScopemanager().postCall(request, reply, ci.libname);
-
- state = stateSendReply;
- reply.sendReply(http_return);
- }
-
- if (reply.out())
- log_debug("reply sent");
- else
- log_warn("stream error");
-
- return;
- }
- else
- log_debug("component " << ci << " returned DECLINED");
- }
- catch (const cxxtools::dl::DlopenError& e)
- {
- log_warn("DlopenError catched - libname " << e.getLibname());
- }
- catch (const cxxtools::dl::SymbolNotFound& e)
- {
- log_warn("SymbolNotFound catched - symbol " << e.getSymbol());
- }
- }
-
- throw NotFoundException(request.getUrl());
- }
-
- void Worker::timer()
- {
- time_t currentTime;
- time(&currentTime);
-
- cxxtools::MutexLock lock(mutex);
- for (workers_type::iterator it = workers.begin();
- it != workers.end(); ++it)
- {
- (*it)->healthCheck(currentTime);
- }
- }
-
- void Worker::healthCheck(time_t currentTime)
- {
- if (state == stateProcessingRequest
- && lastWaitTime != 0
- && maxRequestTime > 0)
- {
- if (static_cast<unsigned>(currentTime - lastWaitTime) > maxRequestTime)
- {
- log_fatal("requesttime " << maxRequestTime << " seconds in thread "
- << threadId << " exceeded - exit process");
- log_info("current state: " << state);
- exit(111);
- }
- }
- }
-
- Worker::workers_type::size_type Worker::getCountThreads()
- {
- cxxtools::MutexLock lock(mutex);
- return workers.size();
- }
-}