| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 |
- # ABSTRACT: Define what data is to be uploaded to elasticsearch, and handle it's uploading
- # PODNAME: App::Prove::Elasticsearch::Indexer
- package App::Prove::Elasticsearch::Indexer;
- use strict;
- use warnings;
- use App::Prove::Elasticsearch::Utils();
- use Search::Elasticsearch();
- use JSON::MaybeXS();
- use File::Temp();
- use File::Slurper();
- use List::Util 1.33;
- =head1 SYNOPSIS
- App::Prove::Elasticsearch::Indexer::check_index({ 'server.host' => 'zippy.test', 'server.port' => 9600 });
- =head1 VARIABLES
- =head2 index (STRING)
- The name of the elasticsearch index used.
- If you are subclassing this, be aware that the Searcher plugin will rely on this.
- =cut
- our $index = 'testsuite';
- sub index {
- return $index;
- }
- =head2 max_query_size
- Number of items returned by queries.
- Defaults to 1000.
- =cut
- our $max_query_size = 1000;
- our $e;
- our $bulk_helper;
- our $idx;
- our %stashed;
- =head1 SUBROUTINES
- =head2 check_index
- Returns 1 if the index needed to be created, 0 if it's already OK.
- Dies if the server cannot be reached, or the index creation fails.
- =cut
- sub check_index {
- my $conf = shift;
- my $port = $conf->{'server.port'} ? ':' . $conf->{'server.port'} : '';
- die "server must be specified" unless $conf->{'server.host'};
- die("port must be specified") unless $port;
- my $serveraddress = "$conf->{'server.host'}$port";
- $e //= Search::Elasticsearch->new( nodes => $serveraddress, );
- #XXX for debugging
- #$e->indices->delete( index => $index );
- if ( !$e->indices->exists( index => $index ) ) {
- $e->indices->create(
- index => $index,
- body => {
- index => {
- similarity => {
- default => {
- type => "classic"
- }
- }
- },
- analysis => {
- analyzer => {
- default => {
- type => "custom",
- tokenizer => "whitespace",
- filter => [
- 'lowercase', 'std_english_stop', 'custom_stop'
- ]
- }
- },
- filter => {
- std_english_stop => {
- type => "stop",
- stopwords => "_english_"
- },
- custom_stop => {
- type => "stop",
- stopwords => [ "test", "ok", "not" ]
- }
- }
- },
- mappings => {
- testsuite => {
- properties => {
- id => { type => "integer" },
- elapsed => { type => "integer" },
- occurred => {
- type => "date",
- format => "yyyy-MM-dd HH:mm:ss"
- },
- executor => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- status => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- version => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- test_version => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- platform => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- path => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- defect => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- steps_planned => { type => "integer" },
- body => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- },
- name => {
- type => "text",
- analyzer => "default",
- fielddata => "true",
- term_vector => "yes",
- similarity => "classic",
- fields => {
- keyword => { type => "keyword" }
- }
- },
- steps => {
- properties => {
- number => { type => "integer" },
- text => { type => "text" },
- status => { type => "text" },
- elapsed => { type => "integer" },
- }
- },
- }
- }
- }
- }
- );
- return 1;
- }
- return 0;
- }
- =head2 index_results
- Index a test result (see L<App::Prove::Elasticsearch::Parser> for the input).
- =cut
- sub index_results {
- my ($result) = @_;
- die("check_index must be run first") unless $e;
- $idx //= App::Prove::Elasticsearch::Utils::get_last_index( $e, $index );
- $idx++;
- eval {
- $e->index(
- index => $index,
- id => $idx,
- type => $index,
- body => $result,
- );
- };
- if ($@) {
- if ( ( ref $@ ) eq "Search::Elasticsearch::Error::NoNodes" ) {
- print "Failed to index due to no nodes online - continuing: $@\n";
- _stash_result_for_retry($result);
- return;
- }
- }
- my $doc_exists =
- $e->exists( index => $index, type => 'testsuite', id => $idx );
- if ( !defined($doc_exists) || !int($doc_exists) ) {
- die
- "Failed to Index $result->{'name'}, could find no record with ID $idx\n";
- }
- print "Successfully Indexed test: $result->{'name'} with result ID $idx\n";
- #Now that we've proven the server is up, let's re-upload if we need to.
- _resubmit_stashed_documents();
- }
- sub _stash_result_for_retry {
- my ($result) = @_;
- my $encoder = JSON::MaybeXS->new( utf8 => 1 );
- my $dump = $encoder->encode_json($result);
- my ( $fh, $filename ) = File::Temp::tempfile();
- print $fh $dump;
- #Keep a hold of the File::Temp object so that when our program goes out of scope everything is cleaned up.
- $stashed{$filename} = $fh;
- }
- # Let's give it the "college try TM" with a bulk dump, and give up if that second try fails
- sub _resubmit_stashed_documents {
- my @files = keys(%stashed);
- return unless @files;
- my $encoder = JSON::MaybeXS->new( utf8 => 1 );
- my @bulk_load = map {
- my $out = File::Slurper::read_text($_);
- $encoder->decode_json($out);
- close $stashed{$_};
- delete $stashed{$_};
- } @files;
- return bulk_index_results(@bulk_load);
- }
- =head2 bulk_index_results(@results)
- Helper method for migration scripts.
- Uploads an array of results in bulk such as would be fed to index_results.
- It is up to the caller to chunk inputs as is appropriate for your installation.
- =cut
- sub bulk_index_results {
- my @results = @_;
- $bulk_helper //= $e->bulk_helper(
- index => $index,
- type => $index,
- );
- $idx //= App::Prove::Elasticsearch::Utils::get_last_index( $e, $index );
- $bulk_helper->index( map { $idx++; { id => $idx, source => $_ } }
- @results );
- $bulk_helper->flush();
- }
- =head2 associate_case_with_result(%config)
- Associate an indexed result with a tracked defect.
- Requires configuration to be inside of ENV vars already.
- Arguments Hash:
- =over 4
- =item B<case STRING> - case to associate defect to
- =item B<defects ARRAY> - defects to associate with case
- =item B<platforms ARRAY> - filter out any results not having these platforms
- =item B<versions ARRAY> - filter out any results not having these versions
- =back
- =cut
- sub associate_case_with_result {
- my %opts = @_;
- die("check_index must be run first") unless $e;
- my %q = (
- index => $index,
- body => {
- query => {
- bool => {
- must => [
- {
- match => {
- name => $opts{case},
- }
- },
- ],
- },
- },
- },
- );
- #It's normal to have multiple platforms in a document.
- foreach my $plat ( @{ $opts{platforms} } ) {
- push(
- @{ $q{body}{query}{bool}{must} },
- { match => { platform => $plat } }
- );
- }
- #It's NOT normal to have multiple versions in a document.
- foreach my $version ( @{ $opts{versions} } ) {
- push(
- @{ $q{body}{query}{bool}{should} },
- { match => { version => $version } }
- );
- }
- #Paginate the query, TODO short-circuit when we stop getting results?
- my $hits = App::Prove::Elasticsearch::Utils::do_paginated_query( $e,
- $max_query_size, %q );
- return 0 unless scalar(@$hits);
- #Now, update w/ the defect.
- my $failures = 0;
- my $attempts = 0;
- foreach my $hit (@$hits) {
- $hit->{_source}->{platform} = [ $hit->{_source}->{platform} ]
- if ref( $hit->{_source}->{platform} ) ne 'ARRAY';
- next
- if ( scalar( @{ $opts{versions} } ) && !$hit->{_source}->{version} );
- next
- unless List::Util::any { $hit->{_source}->{version} eq $_ }
- @{ $opts{versions} };
- next
- if ( scalar( @{ $opts{platforms} } )
- && !$hit->{_source}->{platform} );
- next unless List::Util::all {
- my $p = $_;
- grep { $_ eq $p } @{ $hit->{_source}->{platform} }
- }
- @{ $opts{platforms} };
- next unless $hit->{_source}->{name} eq $opts{case};
- $attempts++;
- #Merge the existing defects with the ones we are adding in
- $hit->{defect} //= [];
- my @df_merged =
- List::Util::uniq( ( @{ $hit->{defect} }, @{ $opts{defects} } ) );
- my %update = (
- index => $index,
- id => $hit->{_id},
- type => 'result',
- body => {
- doc => {
- defect => \@df_merged,
- },
- }
- );
- $update{body}{doc}{status} = $opts{status} if $opts{status};
- my $res = $e->update(%update);
- print "Associated cases to document $hit->{_id}\n"
- if $res->{result} eq 'updated';
- if ( !grep { $res->{result} eq $_ } qw{updated noop} ) {
- print
- "Something went wrong associating cases to document $hit->{_id}!\n$res->{result}\n";
- $failures++;
- }
- }
- print "No cases matching your query could be found. No action was taken.\n"
- unless $attempts;
- return $failures;
- }
- 1;
- __END__
- =head1 SPECIAL THANKS
- Thanks to cPanel Inc, for graciously funding the creation of this module.
|