王某某的笔记

记录我的编程之路

Elasticsearch HTTP API

https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started.html

文档

Index API

添加或更新JSON文档到指定的索引

1
2
3
4
5
curl -XPUT 'http://192.168.1.213:9200/twitter/tweet/1' -d '{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}'

Get API

从索引中获取JSON文档根据ID

1
2
curl -XGET 'http://192.168.1.213:9200/twitter/tweet/1'

Delete API

从索引中删除JSON文档根据ID

1
2
curl -XDELETE 'http://192.168.1.213:9200/twitter/tweet/1'

查询

请求体查询

1
2
3
4
5
6
curl -XGET 'http://192.168.1.213:9200/twitter/tweet/_search' -d '{
"query" : {
"term" : { "user" : "kimchy" }
}
}'

URI查询

1
curl -XGET 'http://192.168.1.213:9200/twitter/tweet/_search?q=user:kimchy'

Query DSL

https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html#query-dsl

Elasticsearch提供了基于JSON来定义查询完整的查询DSL。查询DSL看作查询的AST,由两种类型的子句:

  • 叶查询子句

    叶查询子句寻找一个特定的值在某一特定领域,如匹配(match),期限(term)或范围(range)查询。这些查询可以自行使用。

  • 复合查询子句

    复合查询包含其他叶查询或复合的查询,并用于多个查询以逻辑方式(如 bool 或 dis_max 查询)结合,或者改变它们的行为(如 not 或 constant_score 查询)

聚合

取出现次数最多的前20个

1
2
3
4
5
6
7
8
9
10
11
12
POST http://192.168.1.91:9200/_search?search_type=count 

{
"aggs": {
"topLocation": {
"terms": {
"field": "location.raw",
"size": 20
}
}
}
}
查询结果高亮

只有针对字段查询的时候才会高亮,对 _all 进行查询时是不会高亮的。

如:

1
2
3
4
5
6
7
8
9
10
11
12
{
"query": {
"match": {
"commandReply": "USER ftpuser"
}
},
"highlight": {
"fields": {
"commandReply": {}
}
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
...
"frontNodeId": 0,
"routeIp": "192.168.1.1"
}
},
"highlight": {
"commandReply": [
"220 (vsFTPd 3.0.2) <em>USER</em> <em>ftpuser</em> 331 Please specify the password. PASS <em>ftpuser</em> 230 Login"
,
" successful. ### ### Login successful UserName: <em>ftpuser</em> Password: <em>ftpuser</em> ### SYST 215 UNIX Type: L8"
]
}
}

集群

Cluster Health

获取简单的集群状态

1
curl -XGET 'http://192.168.1.213:9200/_cluster/health?pretty=true'

该API还可以针对一个或多个索引执行以只获得指定索引的健康状态

1
curl -XGET 'http://192.168.1.213:9200/_cluster/health/wwh_test?pretty'

Cluster State

群集状态API允许获得整个集群的综合状态信息

1
2
3
4
5
6
curl -XGET 'http://192.168.1.213:9200/_cluster/state'
curl -XGET 'http://192.168.1.213:9200/_cluster/stats?human&pretty'

#过滤
curl -XGET 'http://localhost:9200/_cluster/state/{metrics}/{indices}'


Cat API

  • 查看主节点

http://192.168.1.213:9200/_cat/master?v

curl ‘192.168.1.213:9200/_cat/master?v’

  • 查看集群是否健康

http://192.168.1.213:9200/_cat/health?v

curl ‘192.168.1.213:9200/_cat/health?v’

  • 查看节点列表

http://192.168.1.213:9200/_cat/nodes?v

curl ‘192.168.1.213:9200/_cat/nodes?v’

  • 列出所有索引

http://192.168.1.213:9200/_cat/indices?v

curl ‘192.168.1.213:9200/_cat/indices?v’

CentOS7 安装 Nginx

Nginx是一个高性能的Web服务器软件。它是一个比Apache HTTP服务器更灵活和轻量级的程序。

安装

  1. 安装epel源
    EPEL (Extra Packages for Enterprise Linux,企业版Linux的额外软件包) 是Fedora小组维护的一个软件仓库项目,为RHEL/CentOS提供他们默认不提供的软件包。这个源兼容RHEL及像CentOS和Scientific Linux这样的衍生版本。
    我们可以很容易地通过yum命令从EPEL源上获取上万个在CentOS自带源上没有的软件。EPEL提供的软件包大多基于其对应的Fedora软件包,不会与企业版Linux发行版本的软件发生冲突或替换其文件。
    1
    2
    3
    4
    5
    yum install epel-release
    #安装完成之后可以通过下面的命令查看
    yum repolist
    #可以看到多了一个
    #Extra Packages for Enterprise Linux 7 - x86_64
  2. 安装Nginx
    1
    yum install nginx
  3. 启动Nginx
    1
    systemctl start nginx
    如果运行防火墙,请运行以下命令以允许HTTP和HTTPS流量:
    1
    2
    3
    sudo firewall-cmd --permanent --zone=public --add-service=http 
    sudo firewall-cmd --permanent --zone=public --add-service=https
    sudo firewall-cmd --reload
  4. 访问
    在浏览器中访问
    1
    2
    http://server_domain_name_or_IP/
    http://192.168.1.213/
    能看Nginx的默认页
  5. 系统启动时启动Nginx
    1
    systemctl enable nginx
  6. 配置
    默认服务器根
    默认服务器根目录是 /usr/share/nginx/html 放置在其中的文件将在您的Web服务器上提供访问。
    此位置在Nginx附带的默认服务器块配置文件中指定,该文件位于 /etc/nginx/conf.d/default.conf
    服务器块配置
    可以通过在/etc/nginx/conf.d中创建新的配置文件来添加任何附加的服务器块(在Apache中称为虚拟主机)。当Nginx启动时,将加载该目录中以.conf结尾的文件。
    Nginx全局配置
    主要的Nginx配置文件位于/etc/nginx/nginx.conf。在这里您可以更改设置,如运行Nginx守护进程的用户,以及当Nginx运行时生成的工作进程数等。


忘了之前是怎么安装的了 ~~

服务名是:

nginx16-nginx.service

配置文件位于:

/opt/rh/nginx16/root/etc/nginx/nginx.conf

内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
error_log  /var/log/nginx16/error.log;

......

access_log /var/log/nginx16/access.log main;

......

server {
listen 80;
server_name localhost;

#charset koi8-r;

#access_log /opt/rh/nginx16/root/var/log/nginx/host.access.log main;

location / {
#root /opt/rh/nginx16/root/usr/share/nginx/html;
#index index.html index.htm;

root /data/bdmi/html;
index login.html index.html index.htm;

}


location /baseUrl {
rewrite ^.+baseUrl/?(.*)$ /bdmi-biz/$1 break;
include uwsgi_params;
proxy_pass http://192.168.1.92:8080;

proxy_set_header Cookie $http_cookie;
proxy_cookie_path /bdmi-biz /;

proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

......

关于SELINUX问题

首先查看本机SELinux的开启状态,如果SELinux status参数为enabled即为开启状态

1
/usr/sbin/ sestatus -v

或者使用getenforce命令检查

SELinux can be run in enforcing, permissive, or disabled mode

To add httpd_t to the list of permissive domains, run this command:

1
# semanage permissive -a httpd_t

To delete httpd_t from the list of permissive domains, run:

1
2
# semanage permissive -d httpd_t

To set the mode globally to enforcing, run:

1
2
# setenforce 1

Tomcat 安装

部署

如在CentOS 7上部署Tomcat 8:

1
2
3
4
5
6
7
8
# 下载
wget http://apache.fayea.com/tomcat/tomcat-8/v8.5.4/bin/apache-tomcat-8.5.4.tar.gz

# 解压
tar -zxvf apache-tomcat-8.5.4.tar.gz

# 创建一个软连接
ln -s apache-tomcat-8.5.4 tomcat

配置

修改默认端口
1
vi tomcat/conf/server.xml 
<Connector port="8080" protocol="HTTP/1.1"  connectionTimeout="20000"
    redirectPort="8443" />

配置默认项目

1
vi tomcat/conf/server.xml

在Host 标签内加入如下代码段,docBase属性指定项目名称

<Context path="" docBase="XXXX"  reloadable="true" crossContext="true">  
</Context>  

重新启动Tomcat,在浏览器下输入http://localhost:8080,即可看到 XXXX 的首页

StatSVN介绍

StatSVN是一个Java写的开源代码统计程序,从statCVS移植而来,能够从Subversion版本库中取得信息,然后生成描述项目开发的各种表格和图表。比如:代码行数的时间线;针对每个开发者的代码行数;开发者的活跃程度;开发者最近所提交的;文件数量;平均文件大小;最大文件;哪个文件是修改最多次数的;目录大小;带有文件数量和代码行数的Repository tree。StatSVN当前版本能够生成一组包括表格与图表的静态HTML文档。

StatSVN 使用条件

如前所述,StatSVN是一个Java写的开源代码统计程序,是从Subversion版本库中取得信息的,所以使用StatSVN有两个限制。

  1. 需要安装Java的运行环境(Java Runtime Environment)
  2. 需要使用svn客户端,必须保证本机的svn客户端命令可用

StatSVN 使用方法

使用之前需要先下载StatSVN:http://www.statsvn.org/downloads.html

checkout 工作目录

将需要统计的svn路径下的代码checkout到本地工作目录里,版本可以自由选择,如果你要统计某个版本下的代码量checkout出对应的版本即可,如果需要统计最近的版本时的代码量,checkout最新版本。

生成log文件

使用StatSVN统计代码量时需要使用log文件,生成log文件方法:

命令行下进入工作目录后:svn log -v –xml > logfile.log

使用StatSVN统计SVN中的代码量

将下载好的StatSVN解压,得到statsvn.jar文件,在命令行里执行命令

1
java -jar statsvn.jar C:\project\logfile.log C:\project

这里的C:\project\logfile.log是前一步生成的log文件,C:\project是工作目录。

执行完后,就在当前目录下生成了对应的html结果文档。

命令介绍

格式:

1
2
java -jar statsvn.jar [options] <logfile> <checked-out-module>

参数

1
<logfile>

为前一步中生成的svn log文件

1
<checked-out-module>

为checkout工作拷贝目录,注意两个参数都要列出正确的全路径,否则会提示错误如logfile.log找不到等等

实际上使用的SH脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/sh

#
# 用于统计深暗项目的代码情况
#

#进入SVN检出目录
cd /data/statsvn/project/xxx/xxx-parent


#SVN更新
svn update

#生成log文件
svn log -v --xml > ../xxx-svn.log

#/data/statsvn/project/xxx/xxx-svn.log
#/data/statsvn/statsvn-0.7.0/statsvn.jar

#进入到输出目录
cd /data/statsvn/report/xxx

#删除全部内容
rm -rf *


#生成html文件


#全部后台相关
#java -jar /data/statsvn/statsvn-0.7.0/statsvn.jar /data/statsvn/project/xxx/xxx-svn.log /data/statsvn/project/xxx/xxx-parent -include **/*.java:**/*.xml:**/*.properties:**/*.conf


#包括前后台的
java -jar /data/statsvn/statsvn-0.7.0/statsvn.jar /data/statsvn/project/xxx/xxx-svn.log /data/statsvn/project/xxx/xxx-parent -exclude **/jquery*.js:**/bootstrap*.css:**/bootstrap*.js:**/d3.*:**/echarts.*:**
/webapp/fonts/*

配合Apache,将该脚本定时执行就能得到最新的统计结果

Elasticsearch JDBC 导入器

通过Java数据库连接(JDBC)从JDBC源获取数据导入到Elasticsearch中。

项目地址:
https://github.com/jprante/elasticsearch-jdbc

问题

使用 1.7.0_80 版本的JDK报错:Unsupported major.minor version 52.0

换成 1.8.0_101 版本的JDK之后就可以了。
据说是JDK本身的一个问题。

操作过程

简单的记录操作过程,详情见github。

  1. 下载

    1
    wget http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.4.0/elasticsearch-jdbc-2.3.4.0-dist.zip
  2. 解压缩

    1
    unzip elasticsearch-jdbc-2.3.4.0-dist.zip 
  3. 确定JDBC驱动jar
    检查lib目录是否有你需要的jdbc驱动jar,如果没有需要将相关jar放到该目录中。

  4. 编写一个导入脚本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    #!/bin/sh
    DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
    bin=${DIR}/../bin
    lib=${DIR}/../lib
    echo '{
    "type" : "jdbc",
    "jdbc" : {
    "url" : "jdbc:mysql://192.168.1.212:3306/hxx",
    "user" : "root",
    "password" : "root",
    "sql" : "SELECT *, id as _id FROM xxtable",
    "index" : "test",
    "type" : "rt1",
    "metrics": {
    "enabled" : true
    },
    "elasticsearch" : {
    "cluster" : "wwh_es_cluster",
    "host" : "192.168.1.213",
    "port" : 9300
    }
    }
    }' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter
    ~
  5. 给脚本添加执行权限然后执行

与Elasticsearch交互

Java API

Elasticsearch为Java用户提供了两种内置客户端:

节点客户端(node client):

节点客户端以无数据节点(none data node)身份加入集群,换言之,它自己不存储任何数据,但是它知道数据在集群中的具体位置,并且能够直接转发请求到对应的节点上。

传输客户端(Transport client):

这个更轻量的传输客户端能够发送请求到远程集群。它自己不加入集群,只是简单转发请求给集群中的节点。
两个Java客户端都通过9300端口与集群交互,使用Elasticsearch传输协议(Elasticsearch Transport Protocol)。集群中的节点之间也通过9300端口进行通信。如果此端口未开放,你的节点将不能组成集群。

TIP
Java客户端所在的Elasticsearch版本必须与集群中其他节点一致,否则,它们可能互相无法识别。

关于Java API的更多信息请查看相关章节:Java API


基于HTTP协议,以JSON为数据交互格式的RESTful API

其他所有程序语言都可以使用RESTful API,通过9200端口的与Elasticsearch进行通信,你可以使用你喜欢的WEB客户端,事实上,如你所见,你甚至可以通过curl命令与Elasticsearch通信。

NOTE
Elasticsearch官方提供了多种程序语言的客户端——Groovy,Javascript, .NET,PHP,Perl,Python,以及 Ruby——还有很多由社区提供的客户端和插件,所有这些可以在文档中找到。

向Elasticsearch发出的请求的组成部分与其它普通的HTTP请求是一样的:

1
curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'
  • VERB HTTP方法:GET, POST, PUT, HEAD, DELETE
  • PROTOCOL http或者https协议(只有在Elasticsearch前面有https代理的时候可用)
  • HOST Elasticsearch集群中的任何一个节点的主机名,如果是在本地的节点,那么就叫localhost
  • PORT Elasticsearch HTTP服务所在的端口,默认为9200
  • PATH API路径(例如_count将返回集群中文档的数量),PATH可以包含多个组件,例如_cluster/stats或者_nodes/stats/jvm
  • QUERY_STRING 一些可选的查询请求参数,例如==?pretty==参数将使请求返回更加美观易读的JSON数据
  • BODY 一个JSON格式的请求主体(如果请求需要的话)

举例说明,为了计算集群中的文档数量,我们可以这样做:

1
2
3
4
5
6
7
curl -XGET 'http://localhost:9200/_count?pretty' -d '
{
"query": {
"match_all": {}
}
}
'

Elasticsearch返回一个类似200 OK的HTTP状态码和JSON格式的响应主体(除了HEAD请求)。上面的请求会得到如下的JSON格式的响应主体:

1
2
3
4
5
6
7
8
{
"count" : 0,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}
}

我们看不到HTTP头是因为我们没有让 curl 显示它们,如果要显示,使用 curl 命令后跟 -i 参数:

1
curl -i -XGET 'localhost:9200/'

Multi-fields Reindex

1. 先获取索引

1
get http://192.168.1.51:9200/xxd/_mapping/p1  

得到一个JSON,修改这个JSON

官方文档地址:
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-mapping.html

2. 修改索引

官网的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
PUT my_index
{
"mappings": {
"my_type": {
"properties": {
"city": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
}
}
}
}
}

如果直接照上面的改会报错:no handler for type [keyword] declared on field [raw]
keyword是ES 5中的一个新类型,ES 2.X不支持
这里实际上修改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
 "currentCompany": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},

"headline": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},

"industry": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},

"location": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},

删掉第一级别的==xxd== ,保留如下:

1
2
3
4
{
"mappings": {
"p1": {
......

3. 添加新的类型到索引中

根据上面修改的JSON创建一个新的类型

1
2
3
4
5
6
put http://192.168.1.51:9200/xxd2
{
"mappings": {
"p1": {
"properties": {
......

官方文档地址:
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html

4. 通过get验证新创建的索引

1
get http://192.168.1.51:9200/xxd2/_mapping/p1  

5. Reindex API

官网地址:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex
reindex API是新的,应该仍然被认为是实验性的。 API可能以不向后兼容的方式更改。
重建索引不会尝试设置目标索引。它不复制源索引的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片计数,副本等。

1
2
3
4
5
6
7
8
9
POST http://192.168.1.51:9200/_reindex
{
"source": {
"index": "xxd"
},
"dest": {
"index": "xxd2"
}
}

6. 测试

1
2
3
4
5
6
7
8
9
10
POST http://192.168.1.51:9200/xxd2/p1/_search?search_type=count 
{
"aggs": {
"colors": {
"terms": {
"field": "location.raw"
}
}
}
}

ES全文检索-高亮

全文检索
高亮
统计各个索引命中情况

http://192.168.1.1:9200/dwd-p1,dnd*/_search/

POST

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

{
"query": {
"match": {
"_all": "pro duct"
}
},
"highlight": {
"require_field_match": false,
"fields": {
"*": {}
}
},
"aggs": {
"indexCount": {
"terms": {
"field": "_index"
}
}
}
}

java 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118


logger.debug("查询字符串:{}", queryStr);

// 查询索引
SearchRequestBuilder search = client.prepareSearch(p1IndexName, darknetIndexPrefix + "*");// 查询全部的类型

QueryStringQueryBuilder qs = new QueryStringQueryBuilder(queryStr);
// 最匹配的在前
qs.useDisMax(true);
search.setQuery(qs);

// 高亮所有的字段
search.addHighlightedField("*");

// 所有的字段都进行高亮,而不单单只包含查询匹配的字段
search.setHighlighterRequireFieldMatch(false);

// 分页
int start = (page.getPageNumber() - 1) * page.getPageSize();
search.setFrom(start).setSize(page.getPageSize());

// 统计各个索引的命中情况
search.addAggregation(AggregationBuilders.terms("by_index").field("_index"));

SearchResponse response = search.get();

List<JSONObject> jsonsList = new ArrayList<>();
int length = darknetIndexPrefix.length();
// 将HIT转对象
JSONObject json;
for (SearchHit hit : response.getHits()) {
json = new JSONObject();
String index = hit.getIndex();
String type = hit.getType();
// 大分类
// 小分类
if (p1IndexName.equals(index)) {
json.put("class", "xx数据");
json.put("subclass", xx type);
} else {
json.put("class", "xx2数据");
json.put("subclass", index.substring(length));
}

json.put("index", index);// 索引
json.put("type", type);// 类型
json.put("id", hit.getId());// ID
json.put("score", hit.getScore()); // 得分

// 高亮
JSONArray highlightArray = new JSONArray();
json.put("highlight", highlightArray);

JSONObject fieldJsonObj;
JSONArray _jsonArray;

Map<String, HighlightField> highlightMap = hit.getHighlightFields();
// 如果匹配到了非字符串字段,这个可能为空
// TODO 应该需要进行另外的处理

for (Map.Entry<String, HighlightField> entry : highlightMap.entrySet()) {
fieldJsonObj = new JSONObject();
_jsonArray = new JSONArray();

fieldJsonObj.put(entry.getKey(), _jsonArray);

HighlightField hlfield = entry.getValue();
Text[] texts = hlfield.getFragments();
for (Text text : texts) {
_jsonArray.add(text.toString());
}

highlightArray.add(fieldJsonObj);
}

String dss = hit.sourceAsString();// json格式的数据类容
JSONObject ct = JSON.parseObject(dss, JSONObject.class);
json.put("source", ct);

jsonsList.add(json);
}

/**
* 获取命中情况
*/
// 获取聚合结果
Terms tos = response.getAggregations().get("by_index");

JSONArray jsonArray = new JSONArray();

for (Terms.Bucket bucket : tos.getBuckets()) {

JSONObject hitJson = new JSONObject();

String index = bucket.getKey().toString();

// 大分类
// 小分类
if (p1IndexName.equals(index)) {
hitJson.put("class", "xx数据");
hitJson.put("subclass", "p1");
} else {
hitJson.put("class", "xx2数据");
hitJson.put("subclass", index.substring(length));
}

hitJson.put("hitCount", bucket.getDocCount());

jsonArray.add(hitJson);
}

Page<JSONObject> jsonPage = new Page<>(start, page.getPageSize(), (int) response.getHits().getTotalHits(), jsonsList);
jsonPage.addProperty("hit", jsonArray);// 附加命中结果

return jsonPage;


ES 导入 导出

java代码

导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

public class Input {

/**
* 文件保存路径
*/
private static final String filePath = "F:\\数据文件\\p1\\p1_10.json";

/**
* 索引名称
*/
private static final String indexName = "dwd-p1";

/**
* 类型名称
*/
private static final String typeName = "dwdata";

public static void main(String[] args) throws UnknownHostException {

Settings settings = Settings.settingsBuilder().put("cluster.name", "hinge-es").build();
TransportClient client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.2"), 9300));

BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(new File(filePath)));

String json;

int count = 0;
long total = 0;

BulkRequestBuilder bulkRequest = client.prepareBulk();

while ((json = br.readLine()) != null) {
count++;
total++;

bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));

if (count == 500) {
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
System.err.println("############ 出错了!!!!!");
}

bulkRequest = client.prepareBulk();
count = 0;
System.out.println("已经导入:" + total);
}

}

if (count != 0) {
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
System.err.println("############ 出错了!!!!!");
}

System.out.println("已经导入:" + total);

}

System.out.println("导入结束");

} catch (Exception e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

}

导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class Output {

/**
* 一批获取数据
*/
private static final int BATCH_SIZE = 10000;

/**
* 文件记录数
*/
private static final int FILE_RECORD = 300000;

/**
* 文件保存路径
*/
private static final String filePath = "F:\\数据文件\\dwd-p1\\";

/**
* 索引名称
*/
private static final String indexName = "dwd-p1";

/**
* 类型名称
*/
private static final String typeName = "dwdata";

public static void main(String[] args) throws UnknownHostException {

Settings settings = Settings.settingsBuilder().put("cluster.name", "hinge-es").build();
TransportClient client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.2"), 9300));

SearchResponse scrollResp = client.prepareSearch(indexName).setTypes(typeName).setQuery(QueryBuilders.matchAllQuery()).setSize(BATCH_SIZE).setScroll(new TimeValue(600000))
.execute().actionGet();

int fileIndex = 0;

String outputFile = getFileName(fileIndex);

BufferedWriter out = null;

long totalSize = 0;

try {

out = new BufferedWriter(new FileWriter(outputFile));

while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {

totalSize++;

out.write(hit.getId());// ID
out.write("\r\n");

out.write(hit.getSourceAsString());// 数据
out.write("\r\n");
out.flush();
}

System.out.println("已经导出:" + totalSize);

scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();

if (scrollResp.getHits().getHits().length == 0) {
break;
}

if (totalSize > FILE_RECORD) {
out.flush();
out.close();
out = null;

System.out.println(outputFile);
System.out.println("导出结束,总共导出数据:" + totalSize);
totalSize = 0;
fileIndex++;
outputFile = getFileName(fileIndex);
out = new BufferedWriter(new FileWriter(outputFile));
}
}

System.out.println(outputFile);
System.out.println("导出结束,总共导出数据:" + totalSize);

} catch (IOException e) {
e.printStackTrace();
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

System.out.println("查询结束");
}

private static String getFileName(int fileIndex) {
String outputFile = filePath + indexName + "#" + typeName + "_" + fileIndex + ".json";
return outputFile;
}

}

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务、互斥锁等。

ZooKeeper是一个高性能,可扩展的服务,只要集群中超过一半的节点正常,那么整个集群就可以正常对外提供服务。Zookeeper有数据一致性的保证:顺序一致性,客户端的更新会按照它们发送的次序排序;原子性,更新要么成功,要么失败,不会出现部分成功的(更新操作)结果。单独系统镜像,不管客户端连哪个服务器,它看来都是同一个。可靠性,一旦更新生效,它就会一直保存到下一次客户端更新。

安装

下载:

1
2
3
4
5
6
7
8
# 这个已经不是稳定了
wget http://archive.apache.org/dist/zookeeper/stable/zookeeper-3.4.6.tar.gz

#
http://archive.apache.org/dist/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz
http://archive.apache.org/dist/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz


解压与软链接:

1
2
3
4
5
6
tar -zxvf zookeeper-3.4.6.tar.gz -C /opt

#可以不要
ln -s /opt/zookeeper-3.4.6 /opt/zookeeper

chown -R zookeeper:zookeeper /opt/zookeeper*

复制配置文件

1
cp conf/zoo_sample.cfg conf/zoo.cfg

配置

zoo.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
tickTime=2000

#数据目录. 可以是任意目录
dataDir=/home/zookeeper/data

#log目录, 可以是任意目录
dataLogDir=/home/zookeeper/dataLog

#client连接的端口
clientPort=2181

#配置集群
server.1=192.168.1.213:2888:3888
server.2=192.168.1.214:2888:3888
server.3=192.168.1.215:2888:3888

server.A=B:C:D:其中A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

dataDir 目录中创建一个名为 myid 文件,在文件中写入节点ID,与zoo.cfg中的集群配置相对应

在集群模式中各server的dataDir目录下的myid文件中的数字必须不同

单机版直接解压缩启动就可以了,不用改什么配置

命令

启动

1
bin/zkServer.sh start  

查看状态

1
bin/zkServer.sh status

停止

1
bin/zkServer.sh stop

启动client连接server

1
bin/zkCli.sh -server localhost:2181  

查看状态

1
2
3
4
5
echo stat | nc 192.168.1.213 2181
echo stat | nc 192.168.1.214 2181
echo stat | nc 192.168.1.215 2181
echo stat | nc 192.168.1.216 2181
echo stat | nc 192.168.1.217 2181

服务化

【基于 CentOS 7】

在/usr/lib/systemd/system/ 目录中创建zookeeper.service文件。
文件内容如下:

日志目录和安装目录自己按照实际情况改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Unit]
Description=Zookeeper Service
After=network.target

[Service]
Environment=ZOO_LOG_DIR=/data/zookeeper/zookeeper-3.4.8/
Type=forking
User=dap
ExecStart=/data/zookeeper/zookeeper-3.4.8/bin/zkServer.sh start
ExecStop=/data/zookeeper/zookeeper-3.4.8/bin/zkServer.sh stop
ExecReload=/data/zookeeper/zookeeper-3.4.8/bin/zkServer.sh restart
SuccessExitStatus=SIGKILL

[Install]
WantedBy=multi-user.target

再执行如下命令:

1
2
3
4
5
6
7
8
9
10
11
12
# 重新加载
systemctl daemon-reload

#启动服务
systemctl start zookeeper

#查看服务启动情况
systemctl status zookeeper

#设置为开机启动
systemctl enable zookeeper

0%