From 566747af00ae413c942a7c6702e24c044af36f17 Mon Sep 17 00:00:00 2001 From: Thomas Date: Mon, 14 Oct 2013 21:57:53 +0200 Subject: First attempt to search in multiple folders; do it multi-threaded using pthreads if available --- program/lib/Roundcube/rcube_imap_search.php | 327 ++++++++++++++++++++++++++++ 1 file changed, 327 insertions(+) create mode 100644 program/lib/Roundcube/rcube_imap_search.php (limited to 'program/lib/Roundcube/rcube_imap_search.php') diff --git a/program/lib/Roundcube/rcube_imap_search.php b/program/lib/Roundcube/rcube_imap_search.php new file mode 100644 index 000000000..ed4face98 --- /dev/null +++ b/program/lib/Roundcube/rcube_imap_search.php @@ -0,0 +1,327 @@ + | + +-----------------------------------------------------------------------+ +*/ + +// create classes defined by the pthreads module if that isn't installed +if (!defined('PTHREADS_INHERIT_ALL')) { + class Worker { } + class Stackable { } +} + +/** + * Class to control search jobs on multiple IMAP folders. + * This implement a simple threads pool using the pthreads extension. + * + * @package Framework + * @subpackage Storage + * @author Thomas Bruederli + */ +class rcube_imap_search +{ + public $options = array(); + + private $size = 10; + private $next = 0; + private $workers = array(); + private $states = array(); + private $jobs = array(); + private $conn; + + /** + * Default constructor + */ + public function __construct($options, $conn) + { + $this->options = $options; + $this->conn = $conn; + } + + /** + * Invoke search request to IMAP server + * + * @param array $folders List of IMAP folders to search in + * @param string $str Search criteria + * @param string $charset Search charset + * @param string $sort_field Header field to sort by + * @param boolean $threading True if threaded listing is active + */ + public function exec($folders, $str, $charset = null, $sort_field = null, $threading=null) + { + $pthreads = defined('PTHREADS_INHERIT_ALL'); + + // start a search job for every folder to search in + foreach ($folders as $folder) { + $job = new rcube_imap_search_job($folder, $str, $charset, $sort_field, $threading); + if ($pthreads && $this->submit($job)) { + $this->jobs[] = $job; + } + else { + $job->worker = $this; + $job->run(); + $this->jobs[] = $job; + } + } + + // wait for all workers to be done + $this->shutdown(); + + // gather results + $results = new rcube_result_multifolder; + foreach ($this->jobs as $job) { + $results->add($job->get_result()); + } + + return $results; + } + + /** + * Assign the given job object to one of the worker threads for execution + */ + public function submit(Stackable $job) + { + if (count($this->workers) < $this->size) { + $id = count($this->workers); + $this->workers[$id] = new rcube_imap_search_worker($id, $this->options); + $this->workers[$id]->start(PTHREADS_INHERIT_ALL); + + if ($this->workers[$id]->stack($job)) { + return $job; + } + else { + // trigger_error(sprintf("Failed to push Stackable onto %s", $id), E_USER_WARNING); + } + } + if (($worker = $this->workers[$this->next])) { + $this->next = ($this->next+1) % $this->size; + if ($worker->stack($job)) { + return $job; + } + else { + // trigger_error(sprintf("Failed to stack onto selected worker %s", $worker->id), E_USER_WARNING); + } + } + else { + // trigger_error(sprintf("Failed to select a worker for Stackable"), E_USER_WARNING); + } + + return false; + } + + /** + * Shutdown the pool of threads cleanly, retaining exit status locally + */ + public function shutdown() + { + foreach ($this->workers as $worker) { + $this->states[$worker->getThreadId()] = $worker->shutdown(); + $worker->close(); + } + + # console('shutdown', $this->states); + } + + /** + * Get connection to the IMAP server + * (used for single-thread mode) + */ + public function get_imap() + { + return $this->conn; + } +} + + +/** + * Stackable item to run the search on a specific IMAP folder + */ +class rcube_imap_search_job extends Stackable +{ + private $folder; + private $search; + private $charset; + private $sort_field; + private $threading; + private $searchset; + private $result; + private $pagesize = 100; + + public function __construct($folder, $str, $charset = null, $sort_field = null, $threading=false) + { + $this->folder = $folder; + $this->search = $str; + $this->charset = $charset; + $this->sort_field = $sort_field; + $this->threading = $threading; + } + + public function run() + { + #trigger_error("Start search $this->folder", E_USER_NOTICE); + $this->result = $this->search_index(); + #trigger_error("End search $this->folder: " . $this->result->count(), E_USER_NOTICE); + } + + /** + * Copy of rcube_imap::search_index() + */ + protected function search_index() + { + $criteria = $this->search; + $charset = $this->charset; + + $imap = $this->worker->get_imap(); + + if (!$imap->connected()) { + if ($this->threading) { + return new rcube_result_thread(); + } + else { + return new rcube_result_index(); + } + } + + if ($this->worker->options['skip_deleted'] && !preg_match('/UNDELETED/', $criteria)) { + $criteria = 'UNDELETED '.$criteria; + } + + // unset CHARSET if criteria string is ASCII, this way + // SEARCH won't be re-sent after "unsupported charset" response + if ($charset && $charset != 'US-ASCII' && is_ascii($criteria)) { + $charset = 'US-ASCII'; + } + + if ($this->threading) { + $threads = $imap->thread($this->folder, $this->threading, $criteria, true, $charset); + + // Error, try with US-ASCII (RFC5256: SORT/THREAD must support US-ASCII and UTF-8, + // but I've seen that Courier doesn't support UTF-8) + if ($threads->is_error() && $charset && $charset != 'US-ASCII') { + $threads = $imap->thread($this->folder, $this->threading, + rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII'); + } + + return $threads; + } + + if ($this->sort_field) { + $messages = $imap->sort($this->folder, $this->sort_field, $criteria, true, $charset); + + // Error, try with US-ASCII (RFC5256: SORT/THREAD must support US-ASCII and UTF-8, + // but I've seen Courier with disabled UTF-8 support) + if ($messages->is_error() && $charset && $charset != 'US-ASCII') { + $messages = $imap->sort($this->folder, $this->sort_field, + rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII'); + } + + if (!$messages->is_error()) { + return $messages; + } + } + + $messages = $imap->search($this->folder, + ($charset && $charset != 'US-ASCII' ? "CHARSET $charset " : '') . $criteria, true); + + // Error, try with US-ASCII (some servers may support only US-ASCII) + if ($messages->is_error() && $charset && $charset != 'US-ASCII') { + $messages = $imap->search($this->folder, + rcube_imap::convert_criteria($criteria, $charset), true); + } + + return $messages; + } + + public function get_search_set() + { + return array( + $this->search, + $this->result, + $this->charset, + $this->sort_field, + $this->threading, + ); + } + + public function get_result() + { + return $this->result; + } +} + + +/** + * Wrker thread to run search jobs while maintaining a common context + */ +class rcube_imap_search_worker extends Worker +{ + public $id; + public $options; + + private $conn; + + /** + * Default constructor + */ + public function __construct($id, $options) + { + $this->id = $id; + $this->options = $options; + } + + /** + * Get a dedicated connection to the IMAP server + */ + public function get_imap() + { + // TODO: make this connection persistent for several jobs + #if ($this->conn) + # return $this->conn; + + $conn = new rcube_imap_generic(); + # $conn->setDebug(true, function($conn, $message){ trigger_error($message, E_USER_NOTICE); }); + + if ($this->options['user'] && $this->options['password']) { + $conn->connect($this->options['host'], $this->options['user'], $this->options['password'], $this->options); + } + + if ($conn->error) + trigger_error($this->conn->error, E_USER_WARNING); + + #$this->conn = $conn; + return $conn; + } + + /** + * @override + */ + public function run() + { + + } + + /** + * Close IMAP connection + */ + public function close() + { + if ($this->conn) { + $this->conn->close(); + } + } +} + -- cgit v1.2.3 From b6e24c6946606cd504d522451c36b6dc574fe75d Mon Sep 17 00:00:00 2001 From: Thomas Bruederli Date: Tue, 15 Oct 2013 11:44:34 +0200 Subject: Minor improvements to threaded searching --- program/lib/Roundcube/rcube_imap.php | 8 +++---- program/lib/Roundcube/rcube_imap_search.php | 36 ++++++++++++++++++----------- 2 files changed, 26 insertions(+), 18 deletions(-) (limited to 'program/lib/Roundcube/rcube_imap_search.php') diff --git a/program/lib/Roundcube/rcube_imap.php b/program/lib/Roundcube/rcube_imap.php index db94e7678..0cf34b2ca 100644 --- a/program/lib/Roundcube/rcube_imap.php +++ b/program/lib/Roundcube/rcube_imap.php @@ -969,7 +969,7 @@ class rcube_imap extends rcube_storage $to = $from + $page_size; // sort headers - if (!$this->threading) { + if (!$this->threading && !empty($a_msg_headers)) { $a_msg_headers = $this->conn->sortHeaders($a_msg_headers, $this->sort_field, $this->sort_order); } @@ -1476,10 +1476,8 @@ class rcube_imap extends rcube_storage new rcube_result_index; // trigger autoloader and make these classes available for threaded context new rcube_result_thread; - // connect IMAP - if (!defined('PTHREADS_INHERIT_ALL')) { - $this->check_connection(); - } + // connect IMAP to have all the required classes and settings loaded + $this->check_connection(); $searcher = new rcube_imap_search($this->options, $this->conn); $results = $searcher->exec( diff --git a/program/lib/Roundcube/rcube_imap_search.php b/program/lib/Roundcube/rcube_imap_search.php index ed4face98..d82ec8a24 100644 --- a/program/lib/Roundcube/rcube_imap_search.php +++ b/program/lib/Roundcube/rcube_imap_search.php @@ -172,9 +172,9 @@ class rcube_imap_search_job extends Stackable public function run() { - #trigger_error("Start search $this->folder", E_USER_NOTICE); + // trigger_error("Start search $this->folder", E_USER_NOTICE); $this->result = $this->search_index(); - #trigger_error("End search $this->folder: " . $this->result->count(), E_USER_NOTICE); + // trigger_error("End search $this->folder: " . $this->result->count(), E_USER_NOTICE); } /** @@ -182,6 +182,7 @@ class rcube_imap_search_job extends Stackable */ protected function search_index() { + $pthreads = defined('PTHREADS_INHERIT_ALL'); $criteria = $this->search; $charset = $this->charset; @@ -216,6 +217,10 @@ class rcube_imap_search_job extends Stackable rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII'); } + // close IMAP connection again + if ($pthreads) + $imap->closeConnection(); + return $threads; } @@ -228,21 +233,23 @@ class rcube_imap_search_job extends Stackable $messages = $imap->sort($this->folder, $this->sort_field, rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII'); } - - if (!$messages->is_error()) { - return $messages; - } } - $messages = $imap->search($this->folder, - ($charset && $charset != 'US-ASCII' ? "CHARSET $charset " : '') . $criteria, true); - - // Error, try with US-ASCII (some servers may support only US-ASCII) - if ($messages->is_error() && $charset && $charset != 'US-ASCII') { + if (!$messages || !$messages->is_error()) { $messages = $imap->search($this->folder, - rcube_imap::convert_criteria($criteria, $charset), true); + ($charset && $charset != 'US-ASCII' ? "CHARSET $charset " : '') . $criteria, true); + + // Error, try with US-ASCII (some servers may support only US-ASCII) + if ($messages->is_error() && $charset && $charset != 'US-ASCII') { + $messages = $imap->search($this->folder, + rcube_imap::convert_criteria($criteria, $charset), true); + } } + // close IMAP connection again + if ($pthreads) + $imap->closeConnection(); + return $messages; } @@ -279,6 +286,8 @@ class rcube_imap_search_worker extends Worker */ public function __construct($id, $options) { + $options['ident']['command'] = 'search-'.$id; + $this->id = $id; $this->options = $options; } @@ -296,11 +305,12 @@ class rcube_imap_search_worker extends Worker # $conn->setDebug(true, function($conn, $message){ trigger_error($message, E_USER_NOTICE); }); if ($this->options['user'] && $this->options['password']) { + // TODO: do this synchronized to avoid warnings like "Only one Id allowed in non-authenticated state" $conn->connect($this->options['host'], $this->options['user'], $this->options['password'], $this->options); } if ($conn->error) - trigger_error($this->conn->error, E_USER_WARNING); + trigger_error($conn->error, E_USER_WARNING); #$this->conn = $conn; return $conn; -- cgit v1.2.3 From d53b60406c8070f363d42b32a21670ae68f56cc1 Mon Sep 17 00:00:00 2001 From: Thomas Bruederli Date: Thu, 16 Jan 2014 11:12:43 +0100 Subject: Fix typos --- program/lib/Roundcube/rcube_imap_search.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'program/lib/Roundcube/rcube_imap_search.php') diff --git a/program/lib/Roundcube/rcube_imap_search.php b/program/lib/Roundcube/rcube_imap_search.php index d82ec8a24..70a11bc1c 100644 --- a/program/lib/Roundcube/rcube_imap_search.php +++ b/program/lib/Roundcube/rcube_imap_search.php @@ -235,7 +235,7 @@ class rcube_imap_search_job extends Stackable } } - if (!$messages || !$messages->is_error()) { + if (!$messages || $messages->is_error()) { $messages = $imap->search($this->folder, ($charset && $charset != 'US-ASCII' ? "CHARSET $charset " : '') . $criteria, true); @@ -272,7 +272,7 @@ class rcube_imap_search_job extends Stackable /** - * Wrker thread to run search jobs while maintaining a common context + * Worker thread to run search jobs while maintaining a common context */ class rcube_imap_search_worker extends Worker { -- cgit v1.2.3 From d93ce5cde23b7170b96fd9816e8d5e8cfdf6e0f6 Mon Sep 17 00:00:00 2001 From: Thomas Bruederli Date: Tue, 21 Jan 2014 17:18:28 +0100 Subject: Fix concurrent connections to IMAP while searching --- program/lib/Roundcube/rcube_imap_search.php | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'program/lib/Roundcube/rcube_imap_search.php') diff --git a/program/lib/Roundcube/rcube_imap_search.php b/program/lib/Roundcube/rcube_imap_search.php index 70a11bc1c..c88198140 100644 --- a/program/lib/Roundcube/rcube_imap_search.php +++ b/program/lib/Roundcube/rcube_imap_search.php @@ -189,6 +189,8 @@ class rcube_imap_search_job extends Stackable $imap = $this->worker->get_imap(); if (!$imap->connected()) { + trigger_error("No IMAP connection for $this->folder", E_USER_WARNING); + if ($this->threading) { return new rcube_result_thread(); } @@ -280,14 +282,13 @@ class rcube_imap_search_worker extends Worker public $options; private $conn; + private $counts = 0; /** * Default constructor */ public function __construct($id, $options) { - $options['ident']['command'] = 'search-'.$id; - $this->id = $id; $this->options = $options; } @@ -298,21 +299,19 @@ class rcube_imap_search_worker extends Worker public function get_imap() { // TODO: make this connection persistent for several jobs - #if ($this->conn) - # return $this->conn; + // This doesn't seem to work. Socket connections don't survive serialization which is used in pthreads $conn = new rcube_imap_generic(); # $conn->setDebug(true, function($conn, $message){ trigger_error($message, E_USER_NOTICE); }); if ($this->options['user'] && $this->options['password']) { - // TODO: do this synchronized to avoid warnings like "Only one Id allowed in non-authenticated state" + $this->options['ident']['command'] = 'search-' . $this->id . 't' . ++$this->counts; $conn->connect($this->options['host'], $this->options['user'], $this->options['password'], $this->options); } if ($conn->error) trigger_error($conn->error, E_USER_WARNING); - #$this->conn = $conn; return $conn; } -- cgit v1.2.3