peace唠叨

利用Thrift和zk简单实现服务治理框架中的订阅发布机制

本文简单介绍下利用Thrift和zk简单实现服务治理框架服务的订阅发布机制,类似于Dubbo的服务治理。这个只是简单版本,只供学习和理解用。

全部代码下载:Github链接:github链接,点击惊喜;写文章不易,欢迎大家采我的文章,以及给出有用的评论,当然大家也可以关注一下我的github;多谢;

1.什么是服务治理:

1.1微服务简单介绍:

微服务已经成为当下最热门的话题之一。它是一种新的架构风格,涉及组织架构、设计、交付、运维等方面的变革,核心目标是为了解决系统的交付周期,并降低维护成本和研发成本。相比传统的SOA架构或者单块架构,微服务有很多的优势,比如技术的多样性、模块化、独立部署等,但也带来了相应的成本,比如运维成本、服务管理成本等。

1.2服务治理的出现

在微服务盛行下,利用RMI或Hessian等工具,简单的暴露和引用远程服务,通过配置服务的URL地址进行调用已经变得越来越不能满足需求。
1.服务越来越多时,服务URL配置管理变得非常困难。
2.服务间依赖关系变得错踪复杂
3.服务的调用量越来越大,服务的容量问题就暴露出来,这个服务需要多少机器支撑?什么时候该加机器?
4…….等等
为了满足服务线下管控、保障线上高效运行,需要有一个统一的服务治理框架对服务进行统一、有效管控,保障服务的高效、健康运行。服务治理是分布式服务框架的一个可选特性,尽管从服务开发和运行角度看它不是必须的,但是如果没有服务治理功能,分布式服务框架的服务SLA很难得到保障,服务化也很难真正实施成功。
基于以上原因,需要对各个服务做治理,这也是就为什么有了dubbo这类服务治理框架,它与其他RPC框架相比(例如thrift,avro),不仅仅提供了透明的服务调用,而且还提供了服务治理,比如上述的调用统计管理、负载均衡,这样每个业务模块只需专注于自己的内部业务逻辑即可。

1.3服务治理的几个要素:

1
2
3
4
5
6
7
服务管理组件:这个组件是“服务治理”的核心组件,您的服务治理框架有多强大,主要取决于您的服务管理组件功能有多强大。它至少具有的功能包括:服务注册管理、访问路由;另外,它还可以具有:服务版本管理、服务优先级管理、访问权限管理、请求数量限制、连通性管理、注册服务集群、节点容错、事件订阅-发布、状态监控,等等功能。

服务提供者(服务生产者):即服务的具体实现,然后按照服务治理框架特定的规范发布到服务管理组件中。这意味着什么呢?这意味着,服务提供者不一定按照RPC调用的方式发布服务,而是按照整个服务治理框架所规定的方式进行发布(如果服务治理框架要求服务提供者以RPC调用的形式进行发布,那么服务提供者就必须以RPC调用的形式进行发布;如果服务治理框架要求服务提供者以Http接口的形式进行发布,那么服务提供者就必须以Http接口的形式进行发布,但后者这种情况一般不会出现)。

服务使用者(服务消费者):即调用这个服务的用户,调用者首先到服务管理组件中查询具体的服务所在的位置;服务管理组件收到查询请求后,将向它返回具体的服务所在位置(视服务管理组件功能的不同,还有可能进行这些计算:判断服务调用者是否有权限进行调用、是否需要生成认证标记、是否需要重新检查服务提供者的状态、让调用者使用哪一个服务版本等等)。服务调用者在收到具体的服务位置后,向服务提供者发起正式请求,并且返回相应的结果。第二次调用时,服务请求者就可以像服务提供者直接发起调用请求了(当然,您可以有一个服务提供期限的设置,使用租约协议就可以很好的实现)。

参考于: http://blog.csdn.net/yinwenjie/article/details/49869535

简单画了如下图:
00

1.4服务的订阅发布机制

它的核心理念是实现服务消费者和服务提供者的解耦,让服务消费者能够像使用本地接口一样消费远端的服务提供者,而不需要关心服务提供者的位置信息,实现透明化调用。常用的服务注册中心有Zookeeper、ETCD,以及基于数据库的配置中心。

2.设计一个服务治理框架中的订阅发布机制

2.1使用的技术:

1.Zookeeper作为注册中心并进行管控。具体介绍请参考我的博客: Hadoop集群之 ZooKeeper和Hbase环境搭建
2.Thrift提供RPC调用功能。具体介绍请参考我的博客:Apache Thrift入门学习

2.2设计思路

1.利用Zookeeper建立/Service根目录,在该目录下建立相应的服务接口子目录存放该接口的IP地址和端口号—-注册服务
2.利用Thrift创建服务和启动服务
3.利用Zookeeper去对应目录/Service订阅相应服务获得接口的IP地址和端口号,并注册监听事件,当目录改变时更新接口的IP地址和端口号—-订阅服务

3.实现订阅发布机制

3.1实现步骤:

1.编写Thrift的IDL并编译出相应的接口类。
2.实现相应的接口。
3.编写服务启动和注册服务类。
4.编写相应的客户端订阅服务。

3.2代码实现

工程为maven工程,假如不建立maven工程,请下载对应的lib包。
具体实现原理,见注释:
1.IDL文件和编译:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//命名空间定义:java包
namespace java cn.wpeace.thrift
//结构体定义:转化java中的实体类
struct Request{
1:required string userName;
2:required string password;
}
//定义返回类型
struct Student{
1:required string naem;
2:required i32 age;
}
struct People{
1:required string naem;
2:required i32 age;
3:required string sex;
}
//异常描述定义
exception HelloException{
1:required string msg;
}
//服务定义,生成接口用
service StudentService{
list<Student> getAllStudent(1:Request request)throws (1:HelloException e);
}
//服务定义,生成接口用
service PeopleService{
list<People> getAllPeople(1:Request request)throws (1:HelloException e);
}
//thrift -gen java ./zk.thrift

2.实现相应接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class StudentServiceImpl implements Iface {// 实现的是StudentService类下面的接口
@Override
public List<Student> getAllStudent(Request request) throws HelloException, TException {
System.out.println("调用studentService");
System.out.println(request.getUserName());
System.out.println(request.getPassword());
List<Student> students = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Student student = new Student();
student.setNaem("peace" + i);
student.setAge(22 + i);
students.add(student);
}
return students;
}
}
public class PeopleServiceImpl implements Iface{
@Override
public List<People> getAllPeople(Request request) throws HelloException, TException {
System.out.println("调用PeopleService");
System.out.println(request.getUserName());
System.out.println(request.getPassword());
List<People>peoples=new ArrayList<>();
for(int i=0;i<5;i++)
{
People people=new People("wpeace", 22+i, "男");
peoples.add(people);
}
return peoples;
}
}

3.实现服务启动和注册类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package cn.wpeace.thriftService;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.StudentService;
import net.sf.json.JSONObject;
public class ServiceSatrt implements Watcher{
//初始化log4j
static{
BasicConfigurator.configure();
}
private static final Log LOGGER=LogFactory.getLog(ServiceSatrt.class);
private static final Integer[] PORTS={8081,8082};
public static final String serviceNames[]={"studentService","peopleService"};
private static final String SERVICE_IP="192.168.1.118";
private CountDownLatch connectedSignal=new CountDownLatch(1);//用于建立连接
private ZooKeeper zk ;
/**
* thrift服务启动标记
*/

private Integer isThriftStart=0;

/**
* 启动所有服务
*/

private void startServer(){
ServiceSatrt.LOGGER.info("启动Thrift线程");
// 创建启动线程:
StartServerThread studenThread = new StartServerThread(PORTS[0],
new StudentService.Processor<StudentService.Iface>(new StudentServiceImpl()));
StartServerThread peopleThread = new StartServerThread(PORTS[1],
new PeopleService.Processor<PeopleService.Iface>(new PeopleServiceImpl()));
ExecutorService pool = Executors.newFixedThreadPool(2);

pool.submit(studenThread);
pool.submit(peopleThread);
//关闭线程池:线程仍然在运行
pool.shutdown();
}
private class StartServerThread implements Runnable{
private Integer port;
private TProcessor processor;
public StartServerThread(Integer port,TProcessor processor) {
this.port=port;
this.processor=processor;
}
@Override
public void run() {
ServiceSatrt.LOGGER.info("thrift服务正在准备启动");
try {
// 非阻塞式
TNonblockingServerSocket serverSocket=new TNonblockingServerSocket(port);
// 为服务器设置对应的IO网络模型
TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket);
// 设置控制器
tArgs.processor(processor);
// 设置消息封装格式
tArgs.protocolFactory(new TBinaryProtocol.Factory());//Thrift特有的一种二进制描述格式
// 启动Thrift服务
TNonblockingServer server = new TNonblockingServer(tArgs);
server.setServerEventHandler(new StartServerEventHander());
server.serve();//启动后,程序就停在这里了。
} catch (TTransportException e) {
e.printStackTrace();
}

}

}
private class StartServerEventHander implements TServerEventHandler{

@Override
public void preServe() {
synchronized (isThriftStart) {
isThriftStart++;//当全部服务启动成功才连接zk
if(isThriftStart==2){
synchronized (ServiceSatrt.this) {
ServiceSatrt.LOGGER.info("thrift服务启动完成");
ServiceSatrt.this.notify();
}
}
}
}
@Override
public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
return null;
}
@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
}
@Override
public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
}
}
private void connectZk() throws KeeperException, InterruptedException, IOException{
// 连接到zk服务器集群,添加默认的watcher监听
zk= new ZooKeeper("192.168.1.127:2181", 120000, this);
connectedSignal.await();
// 创建一个父级节点Service
Stat pathStat = null;
try {
pathStat = zk.exists("/Service", false);
// 如果条件成立,说明节点不存在(只需要判断一个节点的存在性即可)
// 创建的这个节点是一个“永久状态”的节点
if (pathStat == null) {
zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
System.exit(-1);
}
// 开始添加子级节点,每一个子级节点都表示一个这个服务提供者提供的业务服务
for (int i = 0; i < 2; i++) {
JSONObject nodeData = new JSONObject();
nodeData.put("ip", SERVICE_IP);
nodeData.put("port", PORTS[i]);
zk.create("/Service/" + serviceNames[i], nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
// 执行到这里,说明所有的service都启动完成了
ServiceSatrt.LOGGER.info("===================所有service都启动完成了,主线程开始启动===================");
}
@Override
public void process(WatchedEvent event) {
//建立连接用
if(event.getState()==KeeperState.SyncConnected){
connectedSignal.countDown();
return;
}
//暂在这里不做处理,正常情况下需要处理。

}
public static void main(String[] args) {
//启动服务
ServiceSatrt serviceSatrt=new ServiceSatrt();
serviceSatrt.startServer();
//等待服务启动完成
synchronized (serviceSatrt) {
try {
while (serviceSatrt.isThriftStart<2) {
serviceSatrt.wait();
}
} catch (Exception e) {
ServiceSatrt.LOGGER.error(e);
System.out.println(-1);
}
}
//启动连接
try {
serviceSatrt.connectZk();
} catch (Exception e) {
ServiceSatrt.LOGGER.error(e);
System.out.println(-1);
}
}
}

4.编写客户端类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package cn.wpeace.thriftClinet;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.sound.midi.VoiceStatus;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import cn.wpeace.thrift.People;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.Request;
import cn.wpeace.thrift.Student;
import cn.wpeace.thrift.StudentService;
import cn.wpeace.thriftService.ServiceSatrt;
import net.sf.json.JSONObject;

public class ThriftClinet implements Watcher{
static{
BasicConfigurator.configure();
}
private static final Log LOGGER=LogFactory.getLog(ThriftClinet.class);
private String serverIp;
private String serverPort;
private String servername;
private CountDownLatch connectedSignal=new CountDownLatch(1);//用于建立连接
private ZooKeeper zk;
private void init(String servername) throws IOException, KeeperException, InterruptedException{
// 连接到zk服务器集群,添加默认的watcher监听
this.zk = new ZooKeeper("192.168.1.127:2181", 120000, this);
connectedSignal.await();
this.servername=servername;
updateServer();
ThriftClinet.LOGGER.info("初始化完成");
}
/**
* 从zk上获取Service中的节点数据:包括IP和端口
* @throws KeeperException
* @throws InterruptedException
*/

private void updateServer() throws KeeperException, InterruptedException {
this.serverIp=null;
this.serverPort=null;
/*
*
* 判断服务根节点是否存在
*/

Stat pathStat = null;
try {
pathStat = this.zk.exists("/Service", false);
// 如果条件成立,说明节点不存在
// 创建的这个节点是一个“永久状态”的节点
if (pathStat == null) {
ThriftClinet.LOGGER.info("客户端创立Service");
this.zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return;
}
} catch (Exception e) {
ThriftClinet.LOGGER.error(e);
System.exit(-1);
}
// 获取服务列表
List<String> serviceList = this.zk.getChildren("/Service", false);
if (serviceList == null || serviceList.isEmpty()) {
ThriftClinet.LOGGER.info("未发现相关服务,客户端退出");
return;
}
// 查找所需的服务是否存在
boolean isFound = false;
byte[] data;// 获取节点数据
for (String name : serviceList) {
if (StringUtils.equals(name, this.servername)) {
isFound = true;
break;// 找到一个就退出
}
}
// 获得数据
if (isFound) {
data = this.zk.getData("/Service/" + this.servername, false, null);
} else {
ThriftClinet.LOGGER.info("未发现相关服务,客户端退出");
return;
}
if (data == null || data.length == 0) {
ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出");
return;
}
JSONObject fromObject = JSONObject.fromObject(new String(data));
this.serverIp = fromObject.getString("ip");
this.serverPort = fromObject.getString("port");
}

@Override
public void process(WatchedEvent event) {
//建立连接用
if(event.getState()==KeeperState.SyncConnected){
connectedSignal.countDown();
return;
}
//如果发生 Service下的节点变换,就更新ip和端口
if (event.getType() == EventType.NodeChildrenChanged
&& "/Service".equals(event.getPath())) {
try {
updateServer();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ThriftClinet studentClinet=new ThriftClinet();
ThriftClinet peopleClinet=new ThriftClinet();
/**
* studnetService 测试
*/

try {
studentClinet.init(ServiceSatrt.serviceNames[0]);
if(studentClinet.serverIp==null||studentClinet.serverPort==null){
ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出");
}
//如果是非阻塞型 需要使用
TTransport tSocket = new TFramedTransport(new TSocket(studentClinet.serverIp,
Integer.parseInt(studentClinet.serverPort), 30000));
//设置封装协议
TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
//建立调用client
StudentService.Client client=new StudentService.Client(protocol);
//设置调用参数:
Request request=new Request().setUserName("peace").setPassword("123456");
//准备传输
tSocket.open();
//正式调用接口
List<Student> allStudent = client.getAllStudent(request);
//请求结束,断开连接
tSocket.close();
for(Student student:allStudent)
{
System.out.println(student.getNaem()+":"+student.getAge());
}
} catch (Exception e) {
ThriftClinet.LOGGER.info("出现异常,客户端退出");
}

/**
* PeopleService测试
*/

try {
peopleClinet.init(ServiceSatrt.serviceNames[1]);
if(peopleClinet.serverIp==null||peopleClinet.serverPort==null){
ThriftClinet.LOGGER.info("没有发现有效数据,客户端退出");
}
//如果是非阻塞型 需要使用
TTransport tSocket = new TFramedTransport(new TSocket(peopleClinet.serverIp,
Integer.parseInt(peopleClinet.serverPort), 30000));
//设置封装协议
TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
//建立调用client
PeopleService.Client client=new PeopleService.Client(protocol);
//设置调用参数:
Request request=new Request().setUserName("peace").setPassword("123456");
//准备传输
tSocket.open();
//正式调用接口
List<People> allPeople = client.getAllPeople(request);
//请求结束,断开连接
tSocket.close();
for(People people:allPeople)
{
System.out.println(people.getNaem()+":"+people.getAge()+"性别"+people.getSex());
}
} catch (Exception e) {
ThriftClinet.LOGGER.info("出现异常,客户端退出");
}
}
}

所有代码下载请见github,上面的链接。

3.3测试步骤:

1.启动ServiceSatrt类
2.启动ThriftClinet类
3.测试结果:
服务端:
01
02
客户端:
03
04
本文来自伊豚(blog.wpeace.cn)

Peace wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!