EHS Embedded HTTP Server  1.5.0.132
samples/wsendpoint.h
00001 /*
00002  * This file has been partially derived from the WebSockets++ project at
00003  * https://github.com/zaphoyd/websocketpp which is licensed under a BSD-license.
00004  */
00005 
00006 #ifndef WSENDPOINT_H
00007 #define WSENDPOINT_H
00008 
00009 #include <vector>
00010 #include <sstream>
00011 #include <iostream>
00012 #include <string>
00013 #include <boost/thread.hpp>
00014 #include "wsframe.h"
00015 
00016 #ifndef HAVE_BOOST_LOCK_GUARD
00017 #include <pthread.h>
00021 class MutexHelper {
00022     public:
00029         MutexHelper(pthread_mutex_t *mutex, bool locknow = true) :
00030             m_pMutex(mutex), m_bLocked(false)
00031         {
00032             if (locknow)
00033                 Lock();
00034         }
00035 
00039         ~MutexHelper()
00040         {
00041             if (m_bLocked)
00042                 pthread_mutex_unlock(m_pMutex);
00043         }
00044 
00048         void Lock()
00049         {
00050             pthread_mutex_lock(m_pMutex);
00051             m_bLocked = true;
00052         }
00053 
00057         void Unlock()
00058         {
00059             m_bLocked = false;
00060             pthread_mutex_unlock(m_pMutex);
00061         }
00062     private:
00063         pthread_mutex_t *m_pMutex;
00064         bool m_bLocked;
00065 
00066         MutexHelper(const MutexHelper &);
00067         MutexHelper & operator=(const MutexHelper &);
00068 };
00069 #endif
00070 
00071 namespace wspp {
00072     class wsendpoint;
00073 
00083     class wshandler {
00084         public:
00089             void send_text(const std::string data) {
00090                 send(data, frame::opcode::TEXT);
00091             }
00092 
00097             void send_binary(const std::string data) {
00098                 send(data, frame::opcode::BINARY);
00099             }
00100 
00102             wshandler() : m_endpoint(0) {}
00103 
00105             virtual ~wshandler() {}
00106 
00107         private:
00108             virtual void on_message(std::string header, std::string data) = 0;
00109             virtual void on_close() = 0;
00110             virtual bool on_ping(const std::string data) = 0;
00111             virtual void on_pong(const std::string data) = 0;
00112             virtual void do_response(const std::string data) = 0;
00113 
00114             void send(const std::string& payload, frame::opcode::value op);
00115 
00116             // Non-copyable
00117             wshandler(const wshandler&);
00118             wshandler& operator=(const wshandler&);
00119 
00120             wsendpoint *m_endpoint;
00121             friend class wsendpoint;
00122     };
00123 
00127     class wsendpoint {
00128         private:
00129             // Non-copyable
00130             wsendpoint(const wsendpoint&);
00131             wsendpoint& operator=(const wsendpoint&);
00132 
00133         public:
00138             wsendpoint(wshandler *h)
00139                 : m_rng(simple_rng())
00140                   , m_parser(frame::parser<simple_rng>(m_rng))
00141                   , m_state(session::state::OPEN)
00142                   , m_lock()
00143                   , m_handler(h)
00144         {
00145 #ifndef HAVE_BOOST_LOCK_GUARD
00146             pthread_mutexattr_t mattr;
00147             pthread_mutexattr_init(&mattr);
00148             pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE);
00149             pthread_mutex_init(&m_lock, &mattr);
00150             pthread_mutexattr_destroy(&mattr);
00151 #endif
00152             m_handler->m_endpoint = this;
00153         }
00154 
00155 #ifndef HAVE_BOOST_LOCK_GUARD
00156             ~wsendpoint() { pthread_mutex_destroy(&m_lock); }
00157 #endif
00158 
00170             void AddRxData(std::string data)
00171             {
00172                 std::istringstream s(data);
00173                 while (m_state != session::state::CLOSED && s.rdbuf()->in_avail()) {
00174                     try {
00175                         m_parser.consume(s);
00176                         if (m_parser.ready()) {
00177                             if (m_parser.is_control()) {
00178                                 process_control();
00179                             } else {
00180                                 process_data();
00181                             }                   
00182                             m_parser.reset();
00183                         }
00184                     } catch (const tracing::wserror & e) {
00185                         if (m_parser.ready()) {
00186                             m_parser.reset();
00187                         }
00188                         switch(e.code()) {
00189                             case tracing::wserror::PROTOCOL_VIOLATION:
00190                                 send_close(close::status::PROTOCOL_ERROR,e.what());
00191                                 break;
00192                             case tracing::wserror::PAYLOAD_VIOLATION:
00193                                 send_close(close::status::INVALID_PAYLOAD,e.what());
00194                                 break;
00195                             case tracing::wserror::INTERNAL_ENDPOINT_ERROR:
00196                                 send_close(close::status::INTERNAL_ENDPOINT_ERROR,e.what());
00197                                 break;
00198                             case tracing::wserror::SOFT_ERROR:
00199                                 continue;
00200                             case tracing::wserror::MESSAGE_TOO_BIG:
00201                                 send_close(close::status::MESSAGE_TOO_BIG,e.what());
00202                                 break;
00203                             case tracing::wserror::OUT_OF_MESSAGES:
00204                                 // we need to wait for a message to be returned by the
00205                                 // client. We exit the read loop. handle_read_frame
00206                                 // will be restarted by recycle()
00207                                 //m_read_state = WAITING;
00208                                 //m_endpoint.wait(type::shared_from_this());
00209                                 return;
00210                             default:
00211                                 // Fatal error, forcibly end connection immediately.
00212                                 std::cerr
00213                                     << "Dropping TCP due to unrecoverable exception: " << e.code()
00214                                     << " (" << e.what() << ")" << std::endl;
00215                                 shutdown();
00216                         }
00217                         break;
00218                     }
00219                 }
00220             }
00221 
00229             void send(const std::string& payload, frame::opcode::value op) {
00230                 frame::parser<simple_rng> control(m_rng);
00231                 control.set_opcode(op);
00232                 control.set_fin(true);
00233                 control.set_masked(false);
00234                 control.set_payload(payload);
00235 
00236                 std::string tmp(control.get_header_str());
00237                 tmp.append(control.get_payload_str());
00238                 m_handler->do_response(tmp);
00239             }
00240 
00241         private:
00242             void process_data() {
00243                 m_handler->on_message(m_parser.get_header_str(), m_parser.get_payload_str());
00244             }
00245 
00247 
00257             void shutdown() {
00258 #ifdef HAVE_BOOST_LOCK_GUARD
00259                 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
00260 #else
00261                 MutexHelper((pthread_mutex_t *)&m_lock);
00262 #endif
00263 
00264                 if (m_state == session::state::CLOSED) {return;}
00265 
00266                 m_state = session::state::CLOSED;
00267                 m_handler->on_close();
00268             }
00269 
00282             void pong(const std::vector<unsigned char> & payload) {
00283 #ifdef HAVE_BOOST_LOCK_GUARD
00284                 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
00285 #else
00286                 MutexHelper((pthread_mutex_t *)&m_lock);
00287 #endif
00288 
00289                 if (m_state != session::state::OPEN) {return;}
00290                 // if (m_detached) {return;}
00291 
00292                 // TODO: optimize control messages and handle case where 
00293                 // endpoint is out of messages
00294                 frame::parser<simple_rng> control(m_rng);
00295                 control.set_opcode(frame::opcode::PONG);
00296                 control.set_fin(true);
00297                 control.set_masked(false);
00298                 control.set_payload(payload);
00299 
00300                 std::string tmp(control.get_header_str());
00301                 tmp.append(control.get_payload_str());
00302                 m_handler->do_response(tmp);
00303             }
00304 
00306 
00317             void send_close(close::status::value code, const std::string& reason) {
00318 #ifdef HAVE_BOOST_LOCK_GUARD
00319                 boost::lock_guard<boost::recursive_mutex> lock(m_lock);
00320 #else
00321                 MutexHelper((pthread_mutex_t *)&m_lock);
00322 #endif
00323 
00324                 // if (m_detached) {return;}
00325 
00326                 if (m_state != session::state::OPEN) {
00327                     std::cerr << "Tried to disconnect a session that wasn't open" << std::endl;
00328                     return;
00329                 }
00330 
00331                 if (close::status::invalid(code)) {
00332                     std::cerr << "Tried to close a connection with invalid close code: " 
00333                         << code << std::endl;
00334                     return;
00335                 } else if (close::status::reserved(code)) {
00336                     std::cerr << "Tried to close a connection with reserved close code: " 
00337                         << code << std::endl;
00338                     return;
00339                 }
00340 
00341                 m_state = session::state::CLOSING;
00342 
00343                 frame::parser<simple_rng> control(m_rng);
00344                 control.set_opcode(frame::opcode::CLOSE);
00345                 control.set_fin(true);
00346                 control.set_masked(false);
00347                 if (code != close::status::NO_STATUS) {
00348                     const uint16_t payload = htons(code);
00349                     std::string pl(reinterpret_cast<const char*>(&payload), 2);
00350                     pl.append(reason);
00351                     control.set_payload(pl);
00352                 }
00353 
00354                 std::string tmp(control.get_header_str());
00355                 tmp.append(control.get_payload_str());
00356                 m_handler->do_response(tmp);
00357             }
00358 
00360 
00365             void send_close_ack(close::status::value remote_close_code, std::string remote_close_reason) {
00366                 close::status::value local_close_code;
00367                 std::string local_close_reason;
00368                 // echo close value unless there is a good reason not to.
00369                 if (remote_close_code == close::status::NO_STATUS) {
00370                     local_close_code = close::status::NORMAL;
00371                     local_close_reason = "";
00372                 } else if (remote_close_code == close::status::ABNORMAL_CLOSE) {
00373                     // TODO: can we possibly get here? This means send_close_ack was
00374                     //       called after a connection ended without getting a close
00375                     //       frame
00376                     throw "shouldn't be here";
00377                 } else if (close::status::invalid(remote_close_code)) {
00378                     // TODO: shouldn't be able to get here now either
00379                     local_close_code = close::status::PROTOCOL_ERROR;
00380                     local_close_reason = "Status code is invalid";
00381                 } else if (close::status::reserved(remote_close_code)) {
00382                     // TODO: shouldn't be able to get here now either
00383                     local_close_code = close::status::PROTOCOL_ERROR;
00384                     local_close_reason = "Status code is reserved";
00385                 } else {
00386                     local_close_code = remote_close_code;
00387                     local_close_reason = remote_close_reason;
00388                 }
00389 
00390                 // TODO: check whether we should cancel the current in flight write.
00391                 //       if not canceled the close message will be sent as soon as the
00392                 //       current write completes.
00393 
00394 
00395                 frame::parser<simple_rng> control(m_rng);
00396                 control.set_opcode(frame::opcode::CLOSE);
00397                 control.set_fin(true);
00398                 control.set_masked(false);
00399                 if (local_close_code != close::status::NO_STATUS) {
00400                     const uint16_t payload = htons(local_close_code);
00401                     std::string pl(reinterpret_cast<const char*>(&payload), 2);
00402                     pl.append(local_close_reason);
00403                     control.set_payload(pl);
00404                 }
00405 
00406                 std::string tmp(control.get_header_str());
00407                 tmp.append(control.get_payload_str());
00408                 m_handler->do_response(tmp);
00409                 shutdown();
00410             }
00411 
00412             void process_control() {
00413                 switch (m_parser.get_opcode()) {
00414                     case frame::opcode::PING:
00415                         if (m_handler->on_ping(m_parser.get_payload_str())) {
00416                             pong(m_parser.get_payload());
00417                         }
00418                         break;
00419                     case frame::opcode::PONG:
00420                         m_handler->on_pong(m_parser.get_payload_str());
00421                         break;
00422                     case frame::opcode::CLOSE:
00423                         // check that the codes we got over the wire are valid
00424                         if (m_state == session::state::OPEN) {
00425                             // other end is initiating
00426                             std::cerr << "sending close ack" << std::endl;
00427 
00428                             // TODO:
00429                             send_close_ack(m_parser.get_close_code(), m_parser.get_close_reason());
00430                         } else if (m_state == session::state::CLOSING) {
00431                             // ack of our close
00432                             std::cerr << "got close ack" << std::endl;
00433                             shutdown();
00434                         }
00435                         break;
00436                     default:
00437                         throw tracing::wserror("Invalid Opcode",
00438                                 tracing::wserror::PROTOCOL_VIOLATION);
00439                         break;
00440                 }
00441             }
00442 
00443         private:
00444             simple_rng m_rng;
00445             frame::parser<simple_rng> m_parser;
00446             session::state::value m_state;
00447 #ifdef HAVE_BOOST_LOCK_GUARD
00448             mutable boost::recursive_mutex m_lock;
00449 #else
00450             mutable pthread_mutex_t m_lock;
00451 #endif
00452             wshandler *m_handler;
00453     };
00454 
00455     void wshandler::send(const std::string& payload, frame::opcode::value op)
00456     {
00457         if (m_endpoint) {
00458             m_endpoint->send(payload, op);
00459         }
00460     }
00461 
00462 }
00463 
00464 #endif