elasticsearch数据信息索引操作action support示例分析

2022-07-15,,,,

抽象类分析

action这一部分主要是数据索引)的操作和部分集群信息操作。 所有的请求通过client转发到对应的action上然后再由对应的transportaction来执行相关请求。如果请求能在本机上执行则在本机上执行,否则使用transport进行转发到对应的节点。action support部分是对action的抽象,所有的具体action都继承了support action中的某个类。这里将对这些抽象类进行分析。

这一部分总共分为broadcast(广播),master,nodes,replication及single几个部分。broadcast主要针对一些无具体目标主机的操作,如查询index是否存在,所有继承这个类的action都具有这种类似的性质;nodes主要是对节点的操作,如热点线程查询(hotthread)查询节点上的繁忙线程;replication的子类主要是需要或可以在副本上进行的操作,如索引操作,数据不仅要发送到主shard还要发送到各个副本。single则主要是目标明确的单shard操作,如get操作,根据doc的id取doc,doc 的id能够确定它在哪个shard上,因此操作也在此shard上执行。

doexecute方法

这些support action的实现可以分为两类,第一类就是实现一个内部类作为异步操作器,子类执行doexecute时,初始化该操作器并启动。另外一种就是直接实现一个方法,子类doexecute方法调用该方法进行。transportbroadcastoperationaction就属于前者,它实现了内部操作器asyncbroadcastaction。transportcountaction继承于它,它doexecute方法如下所示:

@override
    protected void doexecute(countrequest request, actionlistener<countresponse> listener) {
        request.nowinmillis = system.currenttimemillis();
        super.doexecute(request, listener);
    }

调用父类的doexecute方法,也就是transportbroadcastoperationaction的方法,它的实现如下所示:

@override
    protected void doexecute(request request, actionlistener&lt;response&gt; listener) {
        new asyncbroadcastaction(request, listener).start();
    }

可以看到它初始化了asyncbroadcastaction并启动。asyncbroadcastaction只是确定了操作的流程,及操作完成如何返回response,并未涉及到具体的操作逻辑。因为这些逻辑都在每个子action中实现,不同的action需要进行不同的操作。如count需要count每个shard并且返回最后的总数值,而indexexistaction则需要对比所有索引查看查询的索引是否存在。start方法的代码如下所示:

public void start() {
      //没有shards
            if (shardsits.size() == 0) {
                // no shards
                try {
                    listener.onresponse(newresponse(request, new atomicreferencearray(0), clusterstate));
                } catch (throwable e) {
                    listener.onfailure(e);
                }
                return;
            }
            request.beforestart();
            // count the local operations, and perform the non local ones
            int shardindex = -1;
       //遍历对每个shards进行操作
            for (final sharditerator shardit : shardsits) {
                shardindex++;
                final shardrouting shard = shardit.nextornull();
                if (shard != null) {
                    performoperation(shardit, shard, shardindex);
                } else {
                    // really, no shards active in this group
                    onoperation(null, shardit, shardindex, new noshardavailableactionexception(shardit.shardid()));
                }
            }
        }

start方法就是遍历所有shards,如果shard存在则执行performoperation方法,在这个方法中会区分该请求能否在本机上进行,能执行则调用shardoperation方法得到结果。这个方法在这是抽象的,每个子类都有实现。否则发送到对应的主机上。,如果shard为null则进行onoperation操作,遍历该shard的其它副本看能否找到可以操作的shard。

performoperation代码

如下所示:

protected void performoperation(final sharditerator shardit, final shardrouting shard, final int shardindex) {
            if (shard == null) {//shard 为null抛出异常
                // no more active shards... (we should not really get here, just safety)
                onoperation(null, shardit, shardindex, new noshardavailableactionexception(shardit.shardid()));
            } else {
                try {
                    final shardrequest shardrequest = newshardrequest(shardit.size(), shard, request);
                    if (shard.currentnodeid().equals(nodes.localnodeid())) {//shard在本地执行shardoperation方法,并通过onoperation方法封装结果
                        threadpool.executor(executor).execute(new runnable() {
                            @override
                            public void run() {
                                try {
                                    onoperation(shard, shardindex, shardoperation(shardrequest));
                                } catch (throwable e) {
                                    onoperation(shard, shardit, shardindex, e);
                                }
                            }
                        });
                    } else {//不是本地shard,发送到对应节点。
                        discoverynode node = nodes.get(shard.currentnodeid());
                        if (node == null) {
                            // no node connected, act as failure
                            onoperation(shard, shardit, shardindex, new noshardavailableactionexception(shardit.shardid()));
                        } else {
                            transportservice.sendrequest(node, transportshardaction, shardrequest, new basetransportresponsehandler&lt;shardresponse&gt;() {
                                @override
                                public shardresponse newinstance() {
                                    return newshardresponse();
                                }
                                @override
                                public string executor() {
                                    return threadpool.names.same;
                                }
                                @override
                                public void handleresponse(shardresponse response) {
                                    onoperation(shard, shardindex, response);
                                }
                                @override
                                public void handleexception(transportexception e) {
                                    onoperation(shard, shardit, shardindex, e);
                                }
                            });
                        }
                    }
                } catch (throwable e) {
                    onoperation(shard, shardit, shardindex, e);
                }
            }
        }

方法shardoperation在counttransportaction的实现如下所示:

@override
    protected shardcountresponse shardoperation(shardcountrequest request) throws elasticsearchexception {
        indexservice indexservice = indicesservice.indexservicesafe(request.shardid().getindex());//
        indexshard indexshard = indexservice.shardsafe(request.shardid().id());
    //构造查询context
        searchshardtarget shardtarget = new searchshardtarget(clusterservice.localnode().id(), request.shardid().getindex(), request.shardid().id());
        searchcontext context = new defaultsearchcontext(0,
                new shardsearchlocalrequest(request.types(), request.nowinmillis(), request.filteringaliases()),
                shardtarget, indexshard.acquiresearcher("count"), indexservice, indexshard,
                scriptservice, cacherecycler, pagecacherecycler, bigarrays, threadpool.estimatedtimeinmilliscounter());
        searchcontext.setcurrent(context);
        try {
            // todo: min score should move to be "null" as a value that is not initialized...
            if (request.minscore() != -1) {
                context.minimumscore(request.minscore());
            }
            bytesreference source = request.querysource();
            if (source != null &amp;&amp; source.length() &gt; 0) {
                try {
                    queryparsecontext.settypes(request.types());
                    context.parsedquery(indexservice.queryparserservice().parsequery(source));
                } finally {
                    queryparsecontext.removetypes();
                }
            }
            final boolean hasterminateaftercount = request.terminateafter() != default_terminate_after;
            boolean terminatedearly = false;
            context.preprocess();
            try {
                long count;
                if (hasterminateaftercount) {//调用lucene的封装接口执行查询并返回结果
                    final lucene.earlyterminatingcollector countcollector =
                            lucene.createcountbasedearlyterminatingcollector(request.terminateafter());
                    terminatedearly = lucene.countwithearlytermination(context.searcher(), context.query(), countcollector);
                    count = countcollector.count();
                } else {
                    count = lucene.count(context.searcher(), context.query());
                }
                return new shardcountresponse(request.shardid(), count, terminatedearly);
            } catch (exception e) {
                throw new queryphaseexecutionexception(context, "failed to execute count", e);
            }
        } finally {
            // this will also release the index searcher
            context.close();
            searchcontext.removecurrent();
        }
    }

可以看到这里是每个action真正的逻辑实现。因为这里涉及到index部分的内容,这里就不详细分析。后面关于index的分析会有涉及。这就是support action中的第一种实现。

master的相关操作

第二种就master的相关操作,因此没有实现对应的操作类,而只是实现了一个方法。该方法的作用跟操作器作用相同,唯一的不同是它没有操作器这么多的变量, 而且它不是异步的。master的操作需要实时进行,执行过程中需要阻塞某些操作,保证集群状态一致性。这里就不再说明,请参考transportmasternodeoperationaction原码。

总结

本篇概括说了support action,并以counttransportaction为例说明了support action中的异步操作器实现,最后简单的分析了master的同步操作。因为这里涉及到很多action不可能一一分析,有兴趣可以参考对应的代码。而且这里有以下index部分的内容,所以没有更深入的分析。在后面分析完index的相关功能后,会挑出几个重要的action做详细分析。

以上就是elasticsearch数据信息索引操作action support示例分析的详细内容,更多关于elasticsearch数据信息索引操作action support的资料请关注其它相关文章!

《elasticsearch数据信息索引操作action support示例分析.doc》

下载本文的Word格式文档,以方便收藏与打印。