演示过程
首先开启服务器
打开一个SDK大于4.4的手机---B
打开一个SDK小于4.4的手机---A相互发送一条消息,对方就可以收到,当然这些消息都是通过服务器【转发】过来的
MainActivity
/**
* Activity启动以后就开启服务,服务开启后就通过ConnectorManager调用Connector中的connect方法对客户端进行认证
* 认证是通过三次握手完成的,服务器为所有认证的客户端新建一个线程,通过此线程和客户端通讯
* 当点击发送按钮后,通过ConnectorManager调用Connector中的方法把消息发送到一个阻塞队列中,最终发给服务器
* 服务器收到消息后将其【转发】给指定客户端的Connector的阻塞队列中,客户端又通过listener.pushData(text)转发消息
* 由于ConnectorManager注册了Connector的回调,ConnectorService又注册了ConnectorManager的回调
* 所以最终调用的是ConnectorService的回调方法pushData(data) ,而此方法又通过发送一条广播将消息转发出去
* 此广播会被PushReceiver接收到,因为其是在MainActivity注册的,所以最终MainActivity也收到了服务器转发过来的消息
*/
public class MainActivity extends Activity implements OnClickListener {
private EditText et;
private Button send;
private PushReceiver receiver = new PushReceiver();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
et = (EditText) findViewById(R.id.et);
send = (Button) findViewById(R.id.send);
send.setOnClickListener(this);
//Activity启动以后就开启服务
startService(new Intent(this, ConnectorService.class));
//在代码中动态注册广播,这种类型的广播不是常驻型广播,也就是说广播跟随程序的生命周期
IntentFilter filter = new IntentFilter();
filter.addAction(PushReceiver.ACTION_TEXT);
registerReceiver(receiver, filter);
}
@Override
protected void onDestroy() {
super.onDestroy();
unregisterReceiver(receiver);
}
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.send:
sendMessage();
break;
default:
break;
}
}
public void sendMessage() {
final String content = et.getText().toString().trim();
if (TextUtils.isEmpty(content)) return;
String sender = null;
String token = null;
String receiver = null;
if (Build.VERSION.SDK_INT > Build.VERSION_CODES.KITKAT) {
sender = "B";
receiver = "A";
token = "B";
} else {
sender = "A";
token = "A";
receiver = "B";
}
Request request = new TextRequest(sender, token, receiver, content);
ConnectorManager.getInstance().putRequest(request);
}
}
PushReceiver
public class PushReceiver extends BroadcastReceiver {
/**发送文本信息的事件*/
public static final String ACTION_TEXT = "com.bqt.action.text";
public static final String DATA_KEY = "data";
@Override
public void onReceive(Context context, Intent intent) {
String action = intent.getAction();
Log.d("activity", "receive");
if (PushReceiver.ACTION_TEXT.equals(action)) {
String text = intent.getStringExtra(PushReceiver.DATA_KEY);
Toast.makeText(context, text, Toast.LENGTH_SHORT).show();
}
}
}
Connector
/**用一个类把与Socket相关的连接、发送消息、断开连接等方法抽离出来,并通过回调方式把结果返回*/
public class Connector {
public static final String DST_NAME = "192.168.31.165";
public static final int DST_PORT = 10002;
private Socket client;
//有界阻塞队列,当容量满时往BlockingQueue中添加数据时会阻塞,当容量为空时取元素操作会阻塞。
private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(8);
private ConnectorListener listener;
/**连接*/
public void connect() {
try {
// 三次握手
if (client == null || client.isClosed()) client = new Socket(DST_NAME, DST_PORT);
new Thread(new Runnable() {
@Override
public void run() {
// 数据通讯
OutputStream os;
try {
os = client.getOutputStream();
// os.write(content.getBytes());
while (true) {
String content = queue.take();
os.write(content.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
InputStream is = client.getInputStream();
byte[] buffer = new byte[1024];
int len = -1;
while ((len = is.read(buffer)) != -1) {
final String text = new String(buffer, 0, len);
System.out.println("服务器转发的消息 : " + text);
//获取服务器向客户端转发的消息
if (listener != null) listener.pushData(text);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
/**认证*/
public void auth(String auth) {
putRequest(auth);
}
/**发送消息*/
public void putRequest(String content) {
try {
queue.put(content);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**断开连接*/
public void disconnect() {
try {
if (client != null && !client.isClosed()) {
client.close();
client = null;
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void setConnectorListener(ConnectorListener listener) {
this.listener = listener;
}
}
ConnectorListener
public interface ConnectorListener {
void pushData(String data);
}
ConnectorManager
/**管理Connector的方法,目的:隐藏实现细节,简化对外暴露的方法*/
public class ConnectorManager implements ConnectorListener {
private static ConnectorManager instance;
private Connector connector;
private ConnectorListener listener;
private ConnectorManager() {
}
public static ConnectorManager getInstance() {
if (instance == null) {
synchronized (ConnectorManager.class) {
if (instance == null) instance = new ConnectorManager();
}
}
return instance;
}
/**连接、注册监听、认证*/
public void connnect(AuthRequest auth) {
connector = new Connector();
connector.setConnectorListener(this);
connector.connect();
connector.auth(auth.getData());
}
/**发送消息*/
public void putRequest(Request request) {
connector.putRequest(request.getData());
}
@Override
public void pushData(String data) {
if (listener != null) listener.pushData(data);
}
public void setConnectorListener(ConnectorListener listener) {
this.listener = listener;
}
}
ConnectorService
/**
* 作用①:Activity启动以后就开启服务通过ConnectorManager调用Connector中的connect方法对客户端进行认证
* 作用②:客户端收到服务器转发过来的消息后通过ConnectorService的pushData通过发送一条广播将消息转发出去
* 注意:为简化代码,若客户端(手机)SDK版本小于4.4则定义为A手机,否则就定义为B手机,请演示时一定注意!
*/
public class ConnectorService extends Service implements ConnectorListener {
private ConnectorManager connectorManager;
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onCreate() {
super.onCreate();
connectorManager = ConnectorManager.getInstance();
new Thread(new Runnable() {
@Override
public void run() {
connectorManager.setConnectorListener(ConnectorService.this);
//认证
AuthRequest request = null;
//当前SDK版本大于4.4---暂时这么区分两部手机,实际肯定不是这么搞得
if (Build.VERSION.SDK_INT > Build.VERSION_CODES.KITKAT) request = new AuthRequest("B", "B");
else request = new AuthRequest("A", "A");
connectorManager.connnect(request);
}
}).start();
}
@Override
public void pushData(String data) {
Log.d("coreService", "data : " + data);
Intent intent = new Intent();
intent.setAction(PushReceiver.ACTION_TEXT);
intent.putExtra(PushReceiver.DATA_KEY, data);
sendBroadcast(intent);
}
}
服务端代码
public class TCPServer {
private static final int port = 10002;
/**保存并标记所有建立连接的客户端*/
private static Map<String, Socket> clients = new LinkedHashMap<String, Socket>();
public static void main(String[] args) {
try {
ServerSocket server = new ServerSocket(port);
while (true) {
System.out.println("准备阻塞...");
// 获得客户端连接,阻塞式方法
final Socket client = server.accept();
System.out.println("阻塞完成...");
//每连接一个客户端就新建一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取客户端的输入流,也即客户端发送的数据。
//注意输入流和输出流相对于内存设备而言,将外设中的数据读取到内存中就是输入
InputStream is = client.getInputStream();
// 输出流,给客户端写数据
OutputStream os = client.getOutputStream();
byte[] buffer = new byte[1024];
int len = -1;
System.out.println("准备read...");
while ((len = is.read(buffer)) != -1) {
System.out.println("read完成...");
String text = new String(buffer, 0, len);
System.out.println(text);
//将客户端发送的json串转换为map
Map<String, String> map = new Gson().fromJson(text, new TypeToken<Map<String, String>>() {
}.getType());
String type = map.get("type");
if ("request".equals(type)) {
String action = map.get("action");
if ("auth".equals(action)) {
// 认证消息处理
String sender = map.get("sender");
System.out.println(sender + "认证");
// 放到容器当中
clients.put(sender, client);
} else if ("text".equals(action)) {
// 文本消息
String sender = map.get("sender");//客户端写死了
String receiver = map.get("receiver");
String content = map.get("content");
Socket s = clients.get(receiver);
if (s != null) {// 在线
OutputStream output = s.getOutputStream();
output.write(content.getBytes());
} else { // 离线
}
}
} else System.out.println("格式错误");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}