Indexer.pm 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. # ABSTRACT: Define what data is to be uploaded to elasticsearch, and handle it's uploading
  2. # PODNAME: App::Prove::Elasticsearch::Indexer
  3. package App::Prove::Elasticsearch::Indexer;
  4. use strict;
  5. use warnings;
  6. use App::Prove::Elasticsearch::Utils();
  7. use Search::Elasticsearch();
  8. use JSON::MaybeXS();
  9. use File::Temp();
  10. use File::Slurper();
  11. use List::Util 1.33;
  12. =head1 SYNOPSIS
  13. App::Prove::Elasticsearch::Indexer::check_index({ 'server.host' => 'zippy.test', 'server.port' => 9600 });
  14. =head1 VARIABLES
  15. =head2 index (STRING)
  16. The name of the elasticsearch index used.
  17. If you are subclassing this, be aware that the Searcher plugin will rely on this.
  18. =cut
  19. our $index = 'testsuite';
  20. sub index {
  21. return $index;
  22. }
  23. =head2 max_query_size
  24. Number of items returned by queries.
  25. Defaults to 1000.
  26. =cut
  27. our $max_query_size = 1000;
  28. our $e;
  29. our $bulk_helper;
  30. our $idx;
  31. our %stashed;
  32. =head1 SUBROUTINES
  33. =head2 check_index
  34. Returns 1 if the index needed to be created, 0 if it's already OK.
  35. Dies if the server cannot be reached, or the index creation fails.
  36. =cut
  37. sub check_index {
  38. my $conf = shift;
  39. my $port = $conf->{'server.port'} ? ':' . $conf->{'server.port'} : '';
  40. die "server must be specified" unless $conf->{'server.host'};
  41. die("port must be specified") unless $port;
  42. my $serveraddress = "$conf->{'server.host'}$port";
  43. $e //= Search::Elasticsearch->new( nodes => $serveraddress, );
  44. #XXX for debugging
  45. #$e->indices->delete( index => $index );
  46. if ( !$e->indices->exists( index => $index ) ) {
  47. $e->indices->create(
  48. index => $index,
  49. body => {
  50. index => {
  51. similarity => {
  52. default => {
  53. type => "classic"
  54. }
  55. }
  56. },
  57. analysis => {
  58. analyzer => {
  59. default => {
  60. type => "custom",
  61. tokenizer => "whitespace",
  62. filter => [
  63. 'lowercase', 'std_english_stop', 'custom_stop'
  64. ]
  65. }
  66. },
  67. filter => {
  68. std_english_stop => {
  69. type => "stop",
  70. stopwords => "_english_"
  71. },
  72. custom_stop => {
  73. type => "stop",
  74. stopwords => [ "test", "ok", "not" ]
  75. }
  76. }
  77. },
  78. mappings => {
  79. testsuite => {
  80. properties => {
  81. id => { type => "integer" },
  82. elapsed => { type => "integer" },
  83. occurred => {
  84. type => "date",
  85. format => "yyyy-MM-dd HH:mm:ss"
  86. },
  87. executor => {
  88. type => "text",
  89. analyzer => "default",
  90. fielddata => "true",
  91. term_vector => "yes",
  92. similarity => "classic",
  93. fields => {
  94. keyword => { type => "keyword" }
  95. }
  96. },
  97. status => {
  98. type => "text",
  99. analyzer => "default",
  100. fielddata => "true",
  101. term_vector => "yes",
  102. similarity => "classic",
  103. fields => {
  104. keyword => { type => "keyword" }
  105. }
  106. },
  107. version => {
  108. type => "text",
  109. analyzer => "default",
  110. fielddata => "true",
  111. term_vector => "yes",
  112. similarity => "classic",
  113. fields => {
  114. keyword => { type => "keyword" }
  115. }
  116. },
  117. test_version => {
  118. type => "text",
  119. analyzer => "default",
  120. fielddata => "true",
  121. term_vector => "yes",
  122. similarity => "classic",
  123. fields => {
  124. keyword => { type => "keyword" }
  125. }
  126. },
  127. platform => {
  128. type => "text",
  129. analyzer => "default",
  130. fielddata => "true",
  131. term_vector => "yes",
  132. similarity => "classic",
  133. fields => {
  134. keyword => { type => "keyword" }
  135. }
  136. },
  137. path => {
  138. type => "text",
  139. analyzer => "default",
  140. fielddata => "true",
  141. term_vector => "yes",
  142. similarity => "classic",
  143. fields => {
  144. keyword => { type => "keyword" }
  145. }
  146. },
  147. defect => {
  148. type => "text",
  149. analyzer => "default",
  150. fielddata => "true",
  151. term_vector => "yes",
  152. similarity => "classic",
  153. fields => {
  154. keyword => { type => "keyword" }
  155. }
  156. },
  157. steps_planned => { type => "integer" },
  158. body => {
  159. type => "text",
  160. analyzer => "default",
  161. fielddata => "true",
  162. term_vector => "yes",
  163. similarity => "classic",
  164. },
  165. name => {
  166. type => "text",
  167. analyzer => "default",
  168. fielddata => "true",
  169. term_vector => "yes",
  170. similarity => "classic",
  171. fields => {
  172. keyword => { type => "keyword" }
  173. }
  174. },
  175. steps => {
  176. properties => {
  177. number => { type => "integer" },
  178. text => { type => "text" },
  179. status => { type => "text" },
  180. elapsed => { type => "integer" },
  181. }
  182. },
  183. }
  184. }
  185. }
  186. }
  187. );
  188. return 1;
  189. }
  190. return 0;
  191. }
  192. =head2 index_results
  193. Index a test result (see L<App::Prove::Elasticsearch::Parser> for the input).
  194. =cut
  195. sub index_results {
  196. my ($result) = @_;
  197. die("check_index must be run first") unless $e;
  198. $idx //= App::Prove::Elasticsearch::Utils::get_last_index( $e, $index );
  199. $idx++;
  200. eval {
  201. $e->index(
  202. index => $index,
  203. id => $idx,
  204. type => $index,
  205. body => $result,
  206. );
  207. };
  208. if ($@) {
  209. if ( ( ref $@ ) eq "Search::Elasticsearch::Error::NoNodes" ) {
  210. print "Failed to index due to no nodes online - continuing: $@\n";
  211. _stash_result_for_retry($result);
  212. return;
  213. }
  214. }
  215. my $doc_exists =
  216. $e->exists( index => $index, type => 'testsuite', id => $idx );
  217. if ( !defined($doc_exists) || !int($doc_exists) ) {
  218. die
  219. "Failed to Index $result->{'name'}, could find no record with ID $idx\n";
  220. }
  221. print "Successfully Indexed test: $result->{'name'} with result ID $idx\n";
  222. #Now that we've proven the server is up, let's re-upload if we need to.
  223. _resubmit_stashed_documents();
  224. }
  225. sub _stash_result_for_retry {
  226. my ($result) = @_;
  227. my $encoder = JSON::MaybeXS->new( utf8 => 1 );
  228. my $dump = $encoder->encode_json($result);
  229. my ( $fh, $filename ) = File::Temp::tempfile();
  230. print $fh $dump;
  231. #Keep a hold of the File::Temp object so that when our program goes out of scope everything is cleaned up.
  232. $stashed{$filename} = $fh;
  233. }
  234. # Let's give it the "college try TM" with a bulk dump, and give up if that second try fails
  235. sub _resubmit_stashed_documents {
  236. my @files = keys(%stashed);
  237. return unless @files;
  238. my $encoder = JSON::MaybeXS->new( utf8 => 1 );
  239. my @bulk_load = map {
  240. my $out = File::Slurper::read_text($_);
  241. $encoder->decode_json($out);
  242. close $stashed{$_};
  243. delete $stashed{$_};
  244. } @files;
  245. return bulk_index_results(@bulk_load);
  246. }
  247. =head2 bulk_index_results(@results)
  248. Helper method for migration scripts.
  249. Uploads an array of results in bulk such as would be fed to index_results.
  250. It is up to the caller to chunk inputs as is appropriate for your installation.
  251. =cut
  252. sub bulk_index_results {
  253. my @results = @_;
  254. $bulk_helper //= $e->bulk_helper(
  255. index => $index,
  256. type => $index,
  257. );
  258. $idx //= App::Prove::Elasticsearch::Utils::get_last_index( $e, $index );
  259. $bulk_helper->index( map { $idx++; { id => $idx, source => $_ } }
  260. @results );
  261. $bulk_helper->flush();
  262. }
  263. =head2 associate_case_with_result(%config)
  264. Associate an indexed result with a tracked defect.
  265. Requires configuration to be inside of ENV vars already.
  266. Arguments Hash:
  267. =over 4
  268. =item B<case STRING> - case to associate defect to
  269. =item B<defects ARRAY> - defects to associate with case
  270. =item B<platforms ARRAY> - filter out any results not having these platforms
  271. =item B<versions ARRAY> - filter out any results not having these versions
  272. =back
  273. =cut
  274. sub associate_case_with_result {
  275. my %opts = @_;
  276. die("check_index must be run first") unless $e;
  277. my %q = (
  278. index => $index,
  279. body => {
  280. query => {
  281. bool => {
  282. must => [
  283. {
  284. match => {
  285. name => $opts{case},
  286. }
  287. },
  288. ],
  289. },
  290. },
  291. },
  292. );
  293. #It's normal to have multiple platforms in a document.
  294. foreach my $plat ( @{ $opts{platforms} } ) {
  295. push(
  296. @{ $q{body}{query}{bool}{must} },
  297. { match => { platform => $plat } }
  298. );
  299. }
  300. #It's NOT normal to have multiple versions in a document.
  301. foreach my $version ( @{ $opts{versions} } ) {
  302. push(
  303. @{ $q{body}{query}{bool}{should} },
  304. { match => { version => $version } }
  305. );
  306. }
  307. #Paginate the query, TODO short-circuit when we stop getting results?
  308. my $hits = App::Prove::Elasticsearch::Utils::do_paginated_query( $e,
  309. $max_query_size, %q );
  310. return 0 unless scalar(@$hits);
  311. #Now, update w/ the defect.
  312. my $failures = 0;
  313. my $attempts = 0;
  314. foreach my $hit (@$hits) {
  315. $hit->{_source}->{platform} = [ $hit->{_source}->{platform} ]
  316. if ref( $hit->{_source}->{platform} ) ne 'ARRAY';
  317. next
  318. if ( scalar( @{ $opts{versions} } ) && !$hit->{_source}->{version} );
  319. next
  320. unless List::Util::any { $hit->{_source}->{version} eq $_ }
  321. @{ $opts{versions} };
  322. next
  323. if ( scalar( @{ $opts{platforms} } )
  324. && !$hit->{_source}->{platform} );
  325. next unless List::Util::all {
  326. my $p = $_;
  327. grep { $_ eq $p } @{ $hit->{_source}->{platform} }
  328. }
  329. @{ $opts{platforms} };
  330. next unless $hit->{_source}->{name} eq $opts{case};
  331. $attempts++;
  332. #Merge the existing defects with the ones we are adding in
  333. $hit->{defect} //= [];
  334. my @df_merged =
  335. List::Util::uniq( ( @{ $hit->{defect} }, @{ $opts{defects} } ) );
  336. my %update = (
  337. index => $index,
  338. id => $hit->{_id},
  339. type => 'result',
  340. body => {
  341. doc => {
  342. defect => \@df_merged,
  343. },
  344. }
  345. );
  346. $update{body}{doc}{status} = $opts{status} if $opts{status};
  347. my $res = $e->update(%update);
  348. print "Associated cases to document $hit->{_id}\n"
  349. if $res->{result} eq 'updated';
  350. if ( !grep { $res->{result} eq $_ } qw{updated noop} ) {
  351. print
  352. "Something went wrong associating cases to document $hit->{_id}!\n$res->{result}\n";
  353. $failures++;
  354. }
  355. }
  356. print "No cases matching your query could be found. No action was taken.\n"
  357. unless $attempts;
  358. return $failures;
  359. }
  360. 1;
  361. __END__
  362. =head1 SPECIAL THANKS
  363. Thanks to cPanel Inc, for graciously funding the creation of this module.