00001 namespace ThreadUtils {
00002 static void WaitAbortable(HANDLE ev, abort_callback & abort, DWORD timeout = INFINITE) {
00003 const HANDLE handles[2] = {ev, abort.get_abort_event()};
00004 SetLastError(0);
00005 const DWORD status = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
00006 switch(status) {
00007 case WAIT_OBJECT_0:
00008 break;
00009 case WAIT_OBJECT_0 + 1:
00010 throw exception_aborted();
00011 default:
00012 throw exception_win32(GetLastError());
00013 }
00014 }
00015
00016 template<typename TWhat>
00017 class CObjectQueue {
00018 public:
00019 CObjectQueue() { m_event.create(true,false); }
00020
00021 template<typename TSource> void Add(const TSource & source) {
00022 insync(m_sync);
00023 m_content.add_item(source);
00024 if (m_content.get_count() == 1) m_event.set_state(true);
00025 }
00026 template<typename TDestination> void Get(TDestination & out, abort_callback & abort) {
00027 WaitAbortable(m_event.get(), abort);
00028 insync(m_sync);
00029 pfc::const_iterator<TWhat> iter = m_content.first();
00030 pfc::dynamic_assert( iter.is_valid() );
00031 out = *iter;
00032 m_content.remove(iter);
00033 if (m_content.get_count() == 0) m_event.set_state(false);
00034 }
00035
00036 private:
00037 win32_event m_event;
00038 critical_section m_sync;
00039 pfc::chain_list_v2_t<TWhat> m_content;
00040 };
00041
00042
00043 template<typename TBase>
00044 class CSingleThreadWrapper : protected pfc::thread {
00045 private:
00046 enum status {
00047 success,
00048 fail,
00049 fail_io,
00050 fail_io_data,
00051 fail_abort,
00052 };
00053 protected:
00054 class command {
00055 protected:
00056 command() : m_status(success), m_abort(), m_completionEvent() {}
00057 virtual void executeImpl(TBase &) {}
00058 virtual ~command() {}
00059 public:
00060 void execute(TBase & obj) {
00061 try {
00062 executeImpl(obj);
00063 m_status = success;
00064 } catch(exception_aborted const & e) {
00065 m_status = fail_abort; m_statusMsg = e.what();
00066 } catch(exception_io_data const & e) {
00067 m_status = fail_io_data; m_statusMsg = e.what();
00068 } catch(exception_io const & e) {
00069 m_status = fail_io; m_statusMsg = e.what();
00070 } catch(std::exception const & e) {
00071 m_status = fail; m_statusMsg = e.what();
00072 }
00073 SetEvent(m_completionEvent);
00074 }
00075 void rethrow() const {
00076 switch(m_status) {
00077 case fail:
00078 throw pfc::exception(m_statusMsg);
00079 case fail_io:
00080 throw exception_io(m_statusMsg);
00081 case fail_io_data:
00082 throw exception_io_data(m_statusMsg);
00083 case fail_abort:
00084 throw exception_aborted();
00085 case success:
00086 break;
00087 default:
00088 throw pfc::exception_bug_check_v2();
00089 }
00090 }
00091 status m_status;
00092 pfc::string8 m_statusMsg;
00093 HANDLE m_completionEvent;
00094 abort_callback * m_abort;
00095 };
00096
00097 typedef pfc::rcptr_t<command> command_ptr;
00098
00099 CSingleThreadWrapper() {
00100 m_completionEvent.create(true,false);
00101 start();
00102 }
00103
00104 ~CSingleThreadWrapper() {
00105 m_threadAbort.abort();
00106 waitTillDone();
00107 }
00108
00109 void invokeCommand(command_ptr cmd, abort_callback & abort) {
00110 abort.check();
00111 m_completionEvent.set_state(false);
00112 pfc::vartoggle_t<abort_callback*> abortToggle(cmd->m_abort, &abort);
00113 pfc::vartoggle_t<HANDLE> eventToggle(cmd->m_completionEvent, m_completionEvent.get() );
00114 m_commands.Add(cmd);
00115 m_completionEvent.wait_for(-1);
00116
00117 cmd->rethrow();
00118 }
00119
00120 private:
00121 void threadProc() {
00122 try {
00123 TBase instance;
00124 for(;;) {
00125 command_ptr cmd;
00126 m_commands.Get(cmd, m_threadAbort);
00127 cmd->execute(instance);
00128 }
00129 } catch(...) {}
00130 }
00131 win32_event m_completionEvent;
00132 CObjectQueue<command_ptr> m_commands;
00133 abort_callback_impl m_threadAbort;
00134 };
00135 }