EHS Embedded HTTP Server
1.5.0.132
|
00001 /* 00002 * This file has been partially derived from the WebSockets++ project at 00003 * https://github.com/zaphoyd/websocketpp which is licensed under a BSD-license. 00004 */ 00005 00006 #ifndef WSENDPOINT_H 00007 #define WSENDPOINT_H 00008 00009 #include <vector> 00010 #include <sstream> 00011 #include <iostream> 00012 #include <string> 00013 #include <boost/thread.hpp> 00014 #include "wsframe.h" 00015 00016 #ifndef HAVE_BOOST_LOCK_GUARD 00017 #include <pthread.h> 00021 class MutexHelper { 00022 public: 00029 MutexHelper(pthread_mutex_t *mutex, bool locknow = true) : 00030 m_pMutex(mutex), m_bLocked(false) 00031 { 00032 if (locknow) 00033 Lock(); 00034 } 00035 00039 ~MutexHelper() 00040 { 00041 if (m_bLocked) 00042 pthread_mutex_unlock(m_pMutex); 00043 } 00044 00048 void Lock() 00049 { 00050 pthread_mutex_lock(m_pMutex); 00051 m_bLocked = true; 00052 } 00053 00057 void Unlock() 00058 { 00059 m_bLocked = false; 00060 pthread_mutex_unlock(m_pMutex); 00061 } 00062 private: 00063 pthread_mutex_t *m_pMutex; 00064 bool m_bLocked; 00065 00066 MutexHelper(const MutexHelper &); 00067 MutexHelper & operator=(const MutexHelper &); 00068 }; 00069 #endif 00070 00071 namespace wspp { 00072 class wsendpoint; 00073 00083 class wshandler { 00084 public: 00089 void send_text(const std::string data) { 00090 send(data, frame::opcode::TEXT); 00091 } 00092 00097 void send_binary(const std::string data) { 00098 send(data, frame::opcode::BINARY); 00099 } 00100 00102 wshandler() : m_endpoint(0) {} 00103 00105 virtual ~wshandler() {} 00106 00107 private: 00108 virtual void on_message(std::string header, std::string data) = 0; 00109 virtual void on_close() = 0; 00110 virtual bool on_ping(const std::string data) = 0; 00111 virtual void on_pong(const std::string data) = 0; 00112 virtual void do_response(const std::string data) = 0; 00113 00114 void send(const std::string& payload, frame::opcode::value op); 00115 00116 // Non-copyable 00117 wshandler(const wshandler&); 00118 wshandler& operator=(const wshandler&); 00119 00120 wsendpoint *m_endpoint; 00121 friend class wsendpoint; 00122 }; 00123 00127 class wsendpoint { 00128 private: 00129 // Non-copyable 00130 wsendpoint(const wsendpoint&); 00131 wsendpoint& operator=(const wsendpoint&); 00132 00133 public: 00138 wsendpoint(wshandler *h) 00139 : m_rng(simple_rng()) 00140 , m_parser(frame::parser<simple_rng>(m_rng)) 00141 , m_state(session::state::OPEN) 00142 , m_lock() 00143 , m_handler(h) 00144 { 00145 #ifndef HAVE_BOOST_LOCK_GUARD 00146 pthread_mutexattr_t mattr; 00147 pthread_mutexattr_init(&mattr); 00148 pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE); 00149 pthread_mutex_init(&m_lock, &mattr); 00150 pthread_mutexattr_destroy(&mattr); 00151 #endif 00152 m_handler->m_endpoint = this; 00153 } 00154 00155 #ifndef HAVE_BOOST_LOCK_GUARD 00156 ~wsendpoint() { pthread_mutex_destroy(&m_lock); } 00157 #endif 00158 00170 void AddRxData(std::string data) 00171 { 00172 std::istringstream s(data); 00173 while (m_state != session::state::CLOSED && s.rdbuf()->in_avail()) { 00174 try { 00175 m_parser.consume(s); 00176 if (m_parser.ready()) { 00177 if (m_parser.is_control()) { 00178 process_control(); 00179 } else { 00180 process_data(); 00181 } 00182 m_parser.reset(); 00183 } 00184 } catch (const tracing::wserror & e) { 00185 if (m_parser.ready()) { 00186 m_parser.reset(); 00187 } 00188 switch(e.code()) { 00189 case tracing::wserror::PROTOCOL_VIOLATION: 00190 send_close(close::status::PROTOCOL_ERROR,e.what()); 00191 break; 00192 case tracing::wserror::PAYLOAD_VIOLATION: 00193 send_close(close::status::INVALID_PAYLOAD,e.what()); 00194 break; 00195 case tracing::wserror::INTERNAL_ENDPOINT_ERROR: 00196 send_close(close::status::INTERNAL_ENDPOINT_ERROR,e.what()); 00197 break; 00198 case tracing::wserror::SOFT_ERROR: 00199 continue; 00200 case tracing::wserror::MESSAGE_TOO_BIG: 00201 send_close(close::status::MESSAGE_TOO_BIG,e.what()); 00202 break; 00203 case tracing::wserror::OUT_OF_MESSAGES: 00204 // we need to wait for a message to be returned by the 00205 // client. We exit the read loop. handle_read_frame 00206 // will be restarted by recycle() 00207 //m_read_state = WAITING; 00208 //m_endpoint.wait(type::shared_from_this()); 00209 return; 00210 default: 00211 // Fatal error, forcibly end connection immediately. 00212 std::cerr 00213 << "Dropping TCP due to unrecoverable exception: " << e.code() 00214 << " (" << e.what() << ")" << std::endl; 00215 shutdown(); 00216 } 00217 break; 00218 } 00219 } 00220 } 00221 00229 void send(const std::string& payload, frame::opcode::value op) { 00230 frame::parser<simple_rng> control(m_rng); 00231 control.set_opcode(op); 00232 control.set_fin(true); 00233 control.set_masked(false); 00234 control.set_payload(payload); 00235 00236 std::string tmp(control.get_header_str()); 00237 tmp.append(control.get_payload_str()); 00238 m_handler->do_response(tmp); 00239 } 00240 00241 private: 00242 void process_data() { 00243 m_handler->on_message(m_parser.get_header_str(), m_parser.get_payload_str()); 00244 } 00245 00247 00257 void shutdown() { 00258 #ifdef HAVE_BOOST_LOCK_GUARD 00259 boost::lock_guard<boost::recursive_mutex> lock(m_lock); 00260 #else 00261 MutexHelper((pthread_mutex_t *)&m_lock); 00262 #endif 00263 00264 if (m_state == session::state::CLOSED) {return;} 00265 00266 m_state = session::state::CLOSED; 00267 m_handler->on_close(); 00268 } 00269 00282 void pong(const std::vector<unsigned char> & payload) { 00283 #ifdef HAVE_BOOST_LOCK_GUARD 00284 boost::lock_guard<boost::recursive_mutex> lock(m_lock); 00285 #else 00286 MutexHelper((pthread_mutex_t *)&m_lock); 00287 #endif 00288 00289 if (m_state != session::state::OPEN) {return;} 00290 // if (m_detached) {return;} 00291 00292 // TODO: optimize control messages and handle case where 00293 // endpoint is out of messages 00294 frame::parser<simple_rng> control(m_rng); 00295 control.set_opcode(frame::opcode::PONG); 00296 control.set_fin(true); 00297 control.set_masked(false); 00298 control.set_payload(payload); 00299 00300 std::string tmp(control.get_header_str()); 00301 tmp.append(control.get_payload_str()); 00302 m_handler->do_response(tmp); 00303 } 00304 00306 00317 void send_close(close::status::value code, const std::string& reason) { 00318 #ifdef HAVE_BOOST_LOCK_GUARD 00319 boost::lock_guard<boost::recursive_mutex> lock(m_lock); 00320 #else 00321 MutexHelper((pthread_mutex_t *)&m_lock); 00322 #endif 00323 00324 // if (m_detached) {return;} 00325 00326 if (m_state != session::state::OPEN) { 00327 std::cerr << "Tried to disconnect a session that wasn't open" << std::endl; 00328 return; 00329 } 00330 00331 if (close::status::invalid(code)) { 00332 std::cerr << "Tried to close a connection with invalid close code: " 00333 << code << std::endl; 00334 return; 00335 } else if (close::status::reserved(code)) { 00336 std::cerr << "Tried to close a connection with reserved close code: " 00337 << code << std::endl; 00338 return; 00339 } 00340 00341 m_state = session::state::CLOSING; 00342 00343 frame::parser<simple_rng> control(m_rng); 00344 control.set_opcode(frame::opcode::CLOSE); 00345 control.set_fin(true); 00346 control.set_masked(false); 00347 if (code != close::status::NO_STATUS) { 00348 const uint16_t payload = htons(code); 00349 std::string pl(reinterpret_cast<const char*>(&payload), 2); 00350 pl.append(reason); 00351 control.set_payload(pl); 00352 } 00353 00354 std::string tmp(control.get_header_str()); 00355 tmp.append(control.get_payload_str()); 00356 m_handler->do_response(tmp); 00357 } 00358 00360 00365 void send_close_ack(close::status::value remote_close_code, std::string remote_close_reason) { 00366 close::status::value local_close_code; 00367 std::string local_close_reason; 00368 // echo close value unless there is a good reason not to. 00369 if (remote_close_code == close::status::NO_STATUS) { 00370 local_close_code = close::status::NORMAL; 00371 local_close_reason = ""; 00372 } else if (remote_close_code == close::status::ABNORMAL_CLOSE) { 00373 // TODO: can we possibly get here? This means send_close_ack was 00374 // called after a connection ended without getting a close 00375 // frame 00376 throw "shouldn't be here"; 00377 } else if (close::status::invalid(remote_close_code)) { 00378 // TODO: shouldn't be able to get here now either 00379 local_close_code = close::status::PROTOCOL_ERROR; 00380 local_close_reason = "Status code is invalid"; 00381 } else if (close::status::reserved(remote_close_code)) { 00382 // TODO: shouldn't be able to get here now either 00383 local_close_code = close::status::PROTOCOL_ERROR; 00384 local_close_reason = "Status code is reserved"; 00385 } else { 00386 local_close_code = remote_close_code; 00387 local_close_reason = remote_close_reason; 00388 } 00389 00390 // TODO: check whether we should cancel the current in flight write. 00391 // if not canceled the close message will be sent as soon as the 00392 // current write completes. 00393 00394 00395 frame::parser<simple_rng> control(m_rng); 00396 control.set_opcode(frame::opcode::CLOSE); 00397 control.set_fin(true); 00398 control.set_masked(false); 00399 if (local_close_code != close::status::NO_STATUS) { 00400 const uint16_t payload = htons(local_close_code); 00401 std::string pl(reinterpret_cast<const char*>(&payload), 2); 00402 pl.append(local_close_reason); 00403 control.set_payload(pl); 00404 } 00405 00406 std::string tmp(control.get_header_str()); 00407 tmp.append(control.get_payload_str()); 00408 m_handler->do_response(tmp); 00409 shutdown(); 00410 } 00411 00412 void process_control() { 00413 switch (m_parser.get_opcode()) { 00414 case frame::opcode::PING: 00415 if (m_handler->on_ping(m_parser.get_payload_str())) { 00416 pong(m_parser.get_payload()); 00417 } 00418 break; 00419 case frame::opcode::PONG: 00420 m_handler->on_pong(m_parser.get_payload_str()); 00421 break; 00422 case frame::opcode::CLOSE: 00423 // check that the codes we got over the wire are valid 00424 if (m_state == session::state::OPEN) { 00425 // other end is initiating 00426 std::cerr << "sending close ack" << std::endl; 00427 00428 // TODO: 00429 send_close_ack(m_parser.get_close_code(), m_parser.get_close_reason()); 00430 } else if (m_state == session::state::CLOSING) { 00431 // ack of our close 00432 std::cerr << "got close ack" << std::endl; 00433 shutdown(); 00434 } 00435 break; 00436 default: 00437 throw tracing::wserror("Invalid Opcode", 00438 tracing::wserror::PROTOCOL_VIOLATION); 00439 break; 00440 } 00441 } 00442 00443 private: 00444 simple_rng m_rng; 00445 frame::parser<simple_rng> m_parser; 00446 session::state::value m_state; 00447 #ifdef HAVE_BOOST_LOCK_GUARD 00448 mutable boost::recursive_mutex m_lock; 00449 #else 00450 mutable pthread_mutex_t m_lock; 00451 #endif 00452 wshandler *m_handler; 00453 }; 00454 00455 void wshandler::send(const std::string& payload, frame::opcode::value op) 00456 { 00457 if (m_endpoint) { 00458 m_endpoint->send(payload, op); 00459 } 00460 } 00461 00462 } 00463 00464 #endif