远程读取elasticSearch数据库并导出数据「建议收藏」

远程读取elasticSearch数据库并导出数据「建议收藏」packageorg.elasticsearch.esTest;importjava.awt.List;importjava.io.BufferedWriter;importjava.io.File;importjava.io.FileWriter;importjava.io.IOException;importjava.util.ArrayList;importjava

大家好,又见面了,我是你们的朋友全栈君。

最近刚开完题,毕设是使用机器学习算法对电磁数据中的异常进行检测。所有的电磁数据都存储在分布式数据库es中,所以第一步需要导出数据,这两天写了点这部分的程序,已经导出部分数据。

package org.elasticsearch.esTest;
import java.awt.List;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
//maven管理依赖
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilders.*;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.SearchHits;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
/** * Hello world! * */
public class EsClient {
static File trace = new File("E:/es data/emcas-2018.01.04_trace.txt");
static File warning = new File("E:/es data/emcas-2018.01.04_warning.txt");
static File other = new File("E:/es data/emcas-2018.01.04_other.txt");
public static Client  getClient() throws IOException {
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", "estest1")
.build();
TransportClient client = new TransportClient(settings).addTransportAddress(  
new InetSocketTransportAddress("10.10.41.153", 9300));
//          FileWriter fw = new FileWriter(article);
//          BufferedWriter bfw = new BufferedWriter(fw);
return client; 
}
public static HashSet<String> write2File(Client client) throws IOException{
long start = System.currentTimeMillis();
int scrollSize = 1000;
SearchResponse response = null; 
FileWriter fw_trace1 = new FileWriter(trace);
BufferedWriter bfw1 = new BufferedWriter(fw_trace1);
FileWriter fw_warning1 = new FileWriter(warning);
BufferedWriter bfw2 = new BufferedWriter(fw_warning1);
FileWriter fw_other1 = new FileWriter(trace);
BufferedWriter bfw3 = new BufferedWriter(fw_other1);
//     ArrayList<Integer>collectid = new ArrayList<Integer>();
HashSet collectid = new HashSet();
int i =0;
while (response == null || response.getHits().hits().length != 0 && i <=1) {            
//         if(i % 100 == 0){
//             fw = new FileWriter(autoCreateFile(i/10+1));
//             BufferedWriter bfw1 = new BufferedWriter(fw); 
//             bfw = bfw1;
//             System.out.println("这是第"+i/10+"万条数据");
//         }           
try{
response = client.prepareSearch("emcas-2017.10.16")
.setQuery(QueryBuilders.matchAllQuery())
.setSize(scrollSize)                            
.setFrom(i*scrollSize)
//                  .setFrom(0)
.execute()
.actionGet();
}
catch (IndexMissingException e) {
System.out.println("not found");
}   
SearchHits hits = response.getHits();
int trace_count = 0;
int warning_count =0;
int other_count = 0;
for(int j = 0 ; j < hits.getHits().length; j++){ 

String jsonstr = hits.getHits()[j]
.getSourceAsString(); 
JSONObject json_1 = JSON.parseObject(jsonstr);
System.out.println(json_1);
if(json_1.get("eventType").equals("trace")){
trace_count++;
collectid.add(json_1.get("collectorId"));
if(trace_count % 100000 == 0){
FileWriter fw_trace2 = new FileWriter(autoCreateFile(trace_count/100000));
BufferedWriter bfw_trace = new BufferedWriter(fw_trace2);
bfw1 = bfw_trace;
}
bfw1.write(json_1.toString()+'\r');
bfw1.flush();
}else if(json_1.get("eventType").equals("warning")){
warning_count++;
if(warning_count % 100 == 0){
FileWriter fw_warning2 = new FileWriter(autoCreateFile(warning_count/100));
BufferedWriter bfw_warning2 = new BufferedWriter(fw_warning2);
bfw2 = bfw_warning2;
}
bfw2.write(json_1.toString()+'\r');
bfw2.flush();
}else{
other_count++;
if(other_count % 100 == 0){
FileWriter fw_other2 = new FileWriter(autoCreateFile(other_count/100));
BufferedWriter bfw_other2 = new BufferedWriter(fw_other2);
bfw3 = bfw_other2;
}
bfw3.write(json_1.toString()+'\r');
bfw3.flush();
}
}                                           
i++;
}            
bfw1.close();
bfw2.close();
bfw3.close();
fw_other1.close();
fw_trace1.close();
fw_warning1.close();
long end = System.currentTimeMillis();
long totalTime = end - start;
System.out.println("总耗时:"+totalTime);
return collectid;
}
public static File autoCreateFile(int i ) throws IOException {
File file = new File("E:/es data/"+i+".txt"); 
file.createNewFile();
return file;
}
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
EsClient instance = new EsClient();
Client client = instance.getClient();
HashSet hashSet = new HashSet();
hashSet = write2File(client);
for (Object object : hashSet) {
System.out.println(object);
}
System.out.println(hashSet.size()+"size!!!!!!!!");
//      GetResponse response = client.prepareGet("emcas-2017.10.18","trace","AV8tK5NeSBmsIUk260HQ")
//      GetResponse response = client.prepareGet("emcas-2017.10.18","status","4")
//              .execute()
//              .actionGet(); 
//      System.out.println(response.getSource()); 
//用于计算es数据库中一个index下docs的总记录数
//      SearchResponse response2 = client.prepareSearch("emcas-2018.01.04")
//                  .setQuery(QueryBuilders.matchAllQuery())
//                  .setSize(0)                             
//                  .execute()
//                  .actionGet();
//      SearchHits hits = response2.getHits();
//      long hitscount = hits.getTotalHits();
//      System.out.println(hitscount);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/134374.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Visdom使用

    Visdom使用这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML图表FLowchart流程图导出与导入导出导入欢迎使用Ma…

  • 8、Cocos2dx 3.0三,找一个小游戏开发3.0存储器管理的版本号

    8、Cocos2dx 3.0三,找一个小游戏开发3.0存储器管理的版本号

  • 敏捷测试的特点_敏捷测试流程特点是

    敏捷测试的特点_敏捷测试流程特点是敏捷测试的特点敏捷测试就是符合敏捷宣言思想,遵守敏捷开发原则,在敏捷开发环境下能够很好地和其整体开发流程融合的一系列的测试实践,这些实践具有鲜明的敏捷开发的特征,如TDD、ATDD、结对编程、持续测试等。和传统测试的区分,可以概括如下:1)传统测试更强调测试的独立性,将“开发人员”和“测试人员”角色分得比较清楚。而敏捷测试可以有专职的测试人员,也可以是全民测试,即在敏捷测试中,可以没有“测试人员”

    2022年10月29日
  • MIPI协议知识

    MIPI协议知识转发路径:https://blog.csdn.net/weixin_41842559/article/details/109828013?spm=1001.2101.3001.6650.14&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-14.pc_relevant_paycolumn_v3&depth_1-utm_source=distribute.pc_

  • 如何安装ps的滤镜插件

    如何安装ps的滤镜插件1.首先在网上搜索并下载一个滤镜插件,这里以磨皮插件Portraiture为例。2.这里是我下载的ps插件。3.找到ps,右击,选择“打开文件所在位置”,找到“plug-ins”文件夹,将刚才下载的文件粘贴进去就可以了。这里需要注意,如果电脑是64位的安装后面带64的,反之亦然。4.重启ps,你会发现在滤镜下方多出来一个插件,点击即可以进入。图片只是一个示例,当然不是给这种图片磨…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号