ES 导入 导出

ES 导入 导出

java代码

导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

public class Input {

/**
* 文件保存路径
*/
private static final String filePath = "F:\\数据文件\\p1\\p1_10.json";

/**
* 索引名称
*/
private static final String indexName = "dwd-p1";

/**
* 类型名称
*/
private static final String typeName = "dwdata";

public static void main(String[] args) throws UnknownHostException {

Settings settings = Settings.settingsBuilder().put("cluster.name", "hinge-es").build();
TransportClient client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.2"), 9300));

BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(new File(filePath)));

String json;

int count = 0;
long total = 0;

BulkRequestBuilder bulkRequest = client.prepareBulk();

while ((json = br.readLine()) != null) {
count++;
total++;

bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));

if (count == 500) {
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
System.err.println("############ 出错了!!!!!");
}

bulkRequest = client.prepareBulk();
count = 0;
System.out.println("已经导入:" + total);
}

}

if (count != 0) {
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
System.err.println("############ 出错了!!!!!");
}

System.out.println("已经导入:" + total);

}

System.out.println("导入结束");

} catch (Exception e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

}

导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class Output {

/**
* 一批获取数据
*/
private static final int BATCH_SIZE = 10000;

/**
* 文件记录数
*/
private static final int FILE_RECORD = 300000;

/**
* 文件保存路径
*/
private static final String filePath = "F:\\数据文件\\dwd-p1\\";

/**
* 索引名称
*/
private static final String indexName = "dwd-p1";

/**
* 类型名称
*/
private static final String typeName = "dwdata";

public static void main(String[] args) throws UnknownHostException {

Settings settings = Settings.settingsBuilder().put("cluster.name", "hinge-es").build();
TransportClient client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.2"), 9300));

SearchResponse scrollResp = client.prepareSearch(indexName).setTypes(typeName).setQuery(QueryBuilders.matchAllQuery()).setSize(BATCH_SIZE).setScroll(new TimeValue(600000))
.execute().actionGet();

int fileIndex = 0;

String outputFile = getFileName(fileIndex);

BufferedWriter out = null;

long totalSize = 0;

try {

out = new BufferedWriter(new FileWriter(outputFile));

while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {

totalSize++;

out.write(hit.getId());// ID
out.write("\r\n");

out.write(hit.getSourceAsString());// 数据
out.write("\r\n");
out.flush();
}

System.out.println("已经导出:" + totalSize);

scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();

if (scrollResp.getHits().getHits().length == 0) {
break;
}

if (totalSize > FILE_RECORD) {
out.flush();
out.close();
out = null;

System.out.println(outputFile);
System.out.println("导出结束,总共导出数据:" + totalSize);
totalSize = 0;
fileIndex++;
outputFile = getFileName(fileIndex);
out = new BufferedWriter(new FileWriter(outputFile));
}
}

System.out.println(outputFile);
System.out.println("导出结束,总共导出数据:" + totalSize);

} catch (IOException e) {
e.printStackTrace();
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

System.out.println("查询结束");
}

private static String getFileName(int fileIndex) {
String outputFile = filePath + indexName + "#" + typeName + "_" + fileIndex + ".json";
return outputFile;
}

}