远程读取elasticSearch数据库并导出数据「建议收藏」

远程读取elasticSearch数据库并导出数据「建议收藏」packageorg.elasticsearch.esTest;importjava.awt.List;importjava.io.BufferedWriter;importjava.io.File;importjava.io.FileWriter;importjava.io.IOException;importjava.util.ArrayList;importjava

大家好,又见面了,我是你们的朋友全栈君。

最近刚开完题,毕设是使用机器学习算法对电磁数据中的异常进行检测。所有的电磁数据都存储在分布式数据库es中,所以第一步需要导出数据,这两天写了点这部分的程序,已经导出部分数据。

package org.elasticsearch.esTest;

import java.awt.List;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
//maven管理依赖
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.index.query.QueryBuilders.*;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.SearchHits;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

/** * Hello world! * */
public class EsClient {
    static File trace = new File("E:/es data/emcas-2018.01.04_trace.txt");
    static File warning = new File("E:/es data/emcas-2018.01.04_warning.txt");
    static File other = new File("E:/es data/emcas-2018.01.04_other.txt");

    public static Client  getClient() throws IOException {
         Settings settings = ImmutableSettings.settingsBuilder()
                    .put("cluster.name", "estest1")
                    .build();
            TransportClient client = new TransportClient(settings).addTransportAddress(  
                    new InetSocketTransportAddress("10.10.41.153", 9300));
//          FileWriter fw = new FileWriter(article);
//          BufferedWriter bfw = new BufferedWriter(fw);
            return client; 
    }


   public static HashSet<String> write2File(Client client) throws IOException{
       long start = System.currentTimeMillis();
       int scrollSize = 1000;
       SearchResponse response = null; 
       FileWriter fw_trace1 = new FileWriter(trace);
       BufferedWriter bfw1 = new BufferedWriter(fw_trace1);

       FileWriter fw_warning1 = new FileWriter(warning);
       BufferedWriter bfw2 = new BufferedWriter(fw_warning1);

       FileWriter fw_other1 = new FileWriter(trace);
       BufferedWriter bfw3 = new BufferedWriter(fw_other1);

//     ArrayList<Integer>collectid = new ArrayList<Integer>();
       HashSet collectid = new HashSet();

       int i =0;
       while (response == null || response.getHits().hits().length != 0 && i <=1) {            
//         if(i % 100 == 0){
//             fw = new FileWriter(autoCreateFile(i/10+1));
//             BufferedWriter bfw1 = new BufferedWriter(fw); 
//             bfw = bfw1;
//             System.out.println("这是第"+i/10+"万条数据");
//         }           
           try{
           response = client.prepareSearch("emcas-2017.10.16")
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setSize(scrollSize)                            
                    .setFrom(i*scrollSize)
//                  .setFrom(0)
                    .execute()
                    .actionGet();
           }
           catch (IndexMissingException e) {
             System.out.println("not found");
        }   




           SearchHits hits = response.getHits();

           int trace_count = 0;
           int warning_count =0;
           int other_count = 0;

           for(int j = 0 ; j < hits.getHits().length; j++){ 
   
               String jsonstr = hits.getHits()[j]
                       .getSourceAsString(); 
               JSONObject json_1 = JSON.parseObject(jsonstr);
               System.out.println(json_1);



               if(json_1.get("eventType").equals("trace")){
                   trace_count++;
                   collectid.add(json_1.get("collectorId"));
                   if(trace_count % 100000 == 0){
                       FileWriter fw_trace2 = new FileWriter(autoCreateFile(trace_count/100000));
                       BufferedWriter bfw_trace = new BufferedWriter(fw_trace2);
                       bfw1 = bfw_trace;
                   }
                   bfw1.write(json_1.toString()+'\r');
                   bfw1.flush();
               }else if(json_1.get("eventType").equals("warning")){
                   warning_count++;
                   if(warning_count % 100 == 0){
                       FileWriter fw_warning2 = new FileWriter(autoCreateFile(warning_count/100));
                       BufferedWriter bfw_warning2 = new BufferedWriter(fw_warning2);
                       bfw2 = bfw_warning2;
                   }
                   bfw2.write(json_1.toString()+'\r');
                   bfw2.flush();
               }else{
                   other_count++;
                   if(other_count % 100 == 0){
                       FileWriter fw_other2 = new FileWriter(autoCreateFile(other_count/100));
                       BufferedWriter bfw_other2 = new BufferedWriter(fw_other2);
                       bfw3 = bfw_other2;
                   }
                   bfw3.write(json_1.toString()+'\r');
                   bfw3.flush();
               }
              }                                           
           i++;
       }            
            bfw1.close();
            bfw2.close();
            bfw3.close();
            fw_other1.close();
            fw_trace1.close();
            fw_warning1.close();
            long end = System.currentTimeMillis();
            long totalTime = end - start;
            System.out.println("总耗时:"+totalTime);
            return collectid;
      }



   public static File autoCreateFile(int i ) throws IOException {
       File file = new File("E:/es data/"+i+".txt"); 
       file.createNewFile();
       return file;
   }




    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
        EsClient instance = new EsClient();
        Client client = instance.getClient();
        HashSet hashSet = new HashSet();
        hashSet = write2File(client);
        for (Object object : hashSet) {
            System.out.println(object);
        }
        System.out.println(hashSet.size()+"size!!!!!!!!");
//      GetResponse response = client.prepareGet("emcas-2017.10.18","trace","AV8tK5NeSBmsIUk260HQ")
//      GetResponse response = client.prepareGet("emcas-2017.10.18","status","4")
//              .execute()
//              .actionGet(); 
//      System.out.println(response.getSource()); 
//用于计算es数据库中一个index下docs的总记录数
//      SearchResponse response2 = client.prepareSearch("emcas-2018.01.04")
//                  .setQuery(QueryBuilders.matchAllQuery())
//                  .setSize(0)                             
//                  .execute()
//                  .actionGet();
//      SearchHits hits = response2.getHits();
//      long hitscount = hits.getTotalHits();
//      System.out.println(hitscount);
    }
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/134374.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • JavaScript刷新当前页面的五种方式

    JavaScript刷新当前页面的五种方式转自:原文地址js刷新当前页面的5种方式1、reload:reload方法,该方法强迫浏览器刷新当前页面。语法:location.reload([bForceGet])   参数:bForceGet,可选参数,默认为false,从客户端缓存里取当前页。true,则以GET方式,从服务端取最新的页面,相当于客户端点击F5("刷新")reload()方法用于重…

  • kettle调度监控平台(kettle-scheduler)开源[通俗易懂]

    kettle调度监控平台(kettle-scheduler)开源[通俗易懂]背景Kettle作为用户规模最多的开源ETL工具,强大简洁的功能深受广大ETL从业者的欢迎。但kettle本身的调度监控功能却非常弱。Pentaho官方都建议采用crontab(Unix平台)和计划任务(Windows平台)来完成调度功能。所以大家在实施kettle作业调度功能的时候,通常采用以下几种方式:使用spoon程序来启动Job,使用crontab或计划任务,自主开发java程序来调用k…

    2022年10月17日
  • BestSync同步软件 同步 VM 虚拟机里的Linux系统下Tomcat webapps里的项目

    BestSync同步软件 同步 VM 虚拟机里的Linux系统下Tomcat webapps里的项目

  • dota2连接服务器没有响应,win10系统dota2无法与任何服务器建立连接的解决方法

    dota2连接服务器没有响应,win10系统dota2无法与任何服务器建立连接的解决方法很多小伙伴都遇到过win10系统dota2无法与任何服务器建立连接的情况,想必大家都遇到过win10系统dota2无法与任何服务器建立连接的情况吧,那么应该怎么处理win10系统dota2无法与任何服务器建立连接呢?我们依照1、按下windows+Q组合键打开搜索框,在搜索框中搜索cmd,在搜索结果中我们可以看到命令提示符在命令提示符选项上单击右键,选择【以管理员身份运行】;2、在命令…

  • HTML表格代码_html如何制作表格代码

    HTML表格代码_html如何制作表格代码表格代码<table></tabie><tablewidth(表格宽度。可以用像素或百分比表示。)=””height=””(行高)border=””(边框)cellpadding=””(内容跟单元格边框的边距。)cellspacing=””(单元格之间的间距。)align=””(对齐方式。)bgcolor=””(背景色)background=””(背景图片。)><tr(行)align=””(一行的内容的水平对齐方式)valign(一行的内容的垂平对齐.

  • python批量修改文件名加后缀_python文件重命名

    python批量修改文件名加后缀_python文件重命名python批量修改文件后缀名

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号