socket 通讯检测客户端非正常断线。_socketrunnable::clientdisconnect-程序员宅基地

技术标签: java  socket  非正常断开  

package com.ist.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Date;

/**
 *  Socket收发器 通过Socket发送数据,并使用新线程监听Socket接收到的数据
 * @author songqiang
 * @createtime 2015-12-15
 */
public abstract class MediaTransceiver implements Runnable{

	protected Socket socket;
	protected InetAddress addr;
	protected String falshId;
	protected DataInputStream in;
	protected DataOutputStream out;
	private boolean runFlag;
	//是否在线标识
	private boolean onlineFlag;
	/**
	 * 服务器端实例化
	 * @param socket
	 */
	public MediaTransceiver(Socket socket) {
		this.socket = socket;
		this.addr = socket.getInetAddress();
	}
	/**
	 * 监听Socket接收的数据(新线程中运行)
	 */
	@Override
	public void run() {
		try {
			//socket输入流
			in = new DataInputStream(this.socket.getInputStream());
			//socket输出流
			out = new DataOutputStream(this.socket.getOutputStream());
		} catch (IOException e) {
			e.printStackTrace();
			runFlag = false;
		}
		//无限接收服务端信息,直到连接端口
		while(runFlag){
			try{
				//接受数据
				final String s = in.readUTF();
				if(s.equals("1")){
					onlineFlag = true;
				}
				this.onReceive(addr, s);
			}catch(EOFException e){
				
			}catch (IOException e){
				// 连接被断开(被动)
				runFlag = false;
			}
		}
		//断开连接
		try {
			in.close();
			out.close();
			socket.close();
			in = null;
			out = null;
			socket = null;
		} catch (IOException e) {
			e.printStackTrace();
		}
		this.onDisconnect(addr);
	}
	static void delay() {
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	/**
	 * 开启Socket收发
	 * 如果开启失败,会断开连接并回调{@code onDisconnect()}
	 */
	public void start(){
		runFlag = true;
		new Thread(this).start();
	}

	/**
	 * 断开连接(主动)
	 * 连接断开后,会回调{@code onDisconnect()}
	 */
	public void stop(){
		runFlag = false;
		try {
			socket.shutdownInput();
			in.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	//向服务器发送falshId
	public void sendFalshId(String falshId){
		if(out!=null){
			try{
				send("falshId:"+falshId);
			}catch(Exception e){
				e.printStackTrace();
			}
		}else{
			if(runFlag){
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				sendFalshId(falshId);
			}
		}
	}
	/**
	 * 向服务端发送信息
	 * @param s
	 * @return
	 */
	public boolean send(String s){
		if(out!=null){
			try{
				out.writeUTF(s);
				out.flush();
				return true;
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		return false;
	}
	/**
	 * 检查socket是否执行
	 */
	public void checkOnLine(){
		send("0");
		onlineFlag = false;
		//线程等待onlineFlag标识是否改变
		Thread checkThread = new Thread(new Runnable() {		
			@Override
			public void run() {
				delay();
				//标识没有改变则判断为离线
				if(!onlineFlag){
					stop();
				}
			}
		});
		checkThread.start();
	}
	/**
	 * 获取连接到的Socket地址
	 * 
	 * @return InetAddress对象
	 */
	public InetAddress getInetAddress() {
		return addr;
	}
	
	public String getFalshId() {
		return falshId;
	}

	public void setFalshId(String falshId) {
		this.falshId = falshId;
	}
	//-------------------------------实例化时实现---------------------------
	/**
	 * 接收到数据
	 * 注意:此回调是在新线程中执行的
	 * @param addr 连接到的Socket地址
	 * @param s:收到的字符串
	 */
	public abstract void onReceive(InetAddress addr, String s);

	/**
	 * 连接断开
	 * 注意:此回调是在新线程中执行的
	 * @param addr
	 * 连接到的Socket地址
	 */
	public abstract void onDisconnect(InetAddress addr);
	
}
public abstract class MediaServer implements Runnable{
	private static MediaServer mediaServer=null;
	//监听端口
	private int port;
	//服务端运行标识
	private boolean runFlag;
	//客户端响应集合(ConcurrentHashMap线程安全解决并发修改问题)
	private Map
    
    
     
      results = new ConcurrentHashMap
     
     
      
      ();
	//客户端集合
	private Map
      
      
       
        clinets = new ConcurrentHashMap
       
       
        
        ();
	/**
	 * 实例化服务端
	 * @param port
	 */
	private MediaServer(int port) {
		this.port = port;
	}
	//加同步锁(线程安全)
	private static synchronized void syncInit(){
		if(mediaServer==null){
			mediaServer = new MediaServer(port) {
				//服务器停止
				@Override
				public void onServerStop() {
					
				}
				//接收方法
				@Override
				public void onReceive(MediaTransceiver client, String s) {
					this.getResults().put(client.falshId, s);
					delay(1);
					//System.out.println("接收结果:"+this.toString()+":"+this.getResults().get(client.falshId));
				}
				//断开连接
				@Override
				public void onDisconnect(MediaTransceiver client) {
					if(null!=client){
						this.getClinets().remove(client.falshId);
						this.getResults().remove(client.falshId);
						updateLogoutTime(client.falshId);
					}
				}
				//连接失败
				@Override
				public void onConnectFailed() {
					System.out.println("连接失败!");
				}
				//连接上
				@Override
				public void onConnect(MediaTransceiver client) {
					
				}		
			};
			mediaServer.start();	
		}
	}
	/**
	 * 获取服务器实例与创建实例分开
	 * 如果在构造函数中抛出异常,实例将永远得不到创建
	 * @return
	 */
	public static synchronized MediaServer getInstance(){
		if(mediaServer == null){
			syncInit();
			mediaServer.updateAllOffLine();
		}
		mediaServer.checkOnLine();
		return mediaServer;
	}
	/**
	 * 服务器启动
	 * 如果启动失败,会回调onServerStop()
	 */
	public void start(){
		runFlag = true;
		new Thread(this).start();
	}
	/**
	 * 服务器停止
	 * 服务器停止后,会回调{@code onServerStop()}
	 */
	public void stop(){
		runFlag = false;
	}
	/**
	 * 监听端口,接受客户端连接(新线程中运行)
	 */
	@Override
	public void run() {
		try{
			final ServerSocket server = new ServerSocket(port);
			//无限等待启动客户端,直到服务器关闭
			while(runFlag){
				try{
					final Socket socket = server.accept();
					//socket.setSoTimeout(10000);//十秒连接超时
					//与客户端建立连接
					startClinet(socket);
				} catch (IOException e) {
					// ServerSocket对象创建出错,服务器启动失败
					this.onConnectFailed();
				}
			}
			//停止服务器
			try{
				for(String key:clinets.keySet()){
					clinets.get(key).stop();
				}
				clinets.clear();
				results.clear();
				server.close();
				mediaServer=null;
			}catch (IOException e) {
				// ServerSocket对象创建出错,服务器启动失败
				e.printStackTrace();
			}
			
		} catch (IOException e) {
			// ServerSocket对象创建出错,服务器启动失败
			e.printStackTrace();
		}
		this.onServerStop();
	}
	
	/**
	 *  启动客户端收发
	 * @param socket
	 */
	public void startClinet(Socket socket){
		//服务器端接收对象
		MediaTransceiver clinet = new MediaTransceiver(socket) {
			//接收信息
			@Override
			public void onReceive(InetAddress addr, String s) {
				if("0".equals(s)){
					this.send("1");
				}else{
					System.out.println("接收:"+s);
					if(null!=s && s.startsWith("falshId:")){//添加客户端
						s=s.replace("falshId:", "");	
						if(s!=null && !"null".equals(s.trim()) && hasFalshId(s)){
							this.falshId=s;
							clinets.put(this.falshId,this);
							updateOnline(falshId);
						}
					}else if(this.falshId!=null){//接受发送消息
						System.out.println(this.falshId+":"+s);
						MediaServer.this.onReceive(this, s);
					}else{
						//System.out.println("返回值"+s);
					}
				}
			}
			//服务器断开连接
			@Override
			public void onDisconnect(InetAddress addr) {
				if(null!=this && null!= this.falshId){
					MediaServer.this.onDisconnect(this);
				}
			}			
		};
		clinet.start();
		this.onConnect(clinet);
	}
	//向客户端发送命令
	public boolean send(String falshId,String s){	
		for(String key:clinets.keySet()){
			MediaTransceiver mt = clinets.get(key);
			if(null!=mt && mt.falshId!=null && mt.falshId.equals(falshId)){
				//连接不为空且soket是连接着的
				if(null!=mt.socket && mt.socket.isConnected()){
					mt.send(s);
					return true;
				}	
			}
		}
		return false;
	}
	//等待客户端响应
	public String getResult(String falshId){
		String resultstr = this.getResults().get(falshId);
		if(null == resultstr){
			return "";
		}else{//去掉已返回命令
			this.getResults().remove(falshId);	
		}
		return resultstr;
	}
	//等待客户端响应
	public Map
        
        
          getResults(){ return results; } public Map 
         
           getClinets() { return clinets; } //修改离线时间 public void updateLogoutTime(String falshId){ 具体实现 } //修改在线终端 public void updateOnline(String falshId){ 具体实现 } public boolean hasFalshId(String falshId){ 具体实现 } //修改所有终端为离线 public void updateAllOffLine(){ 具体实现 } /** * 检查是否在线 */ public void checkOnLine(){ for(String clinetId:clinets.keySet()){ MediaTransceiver clinet = clinets.get(clinetId); clinet.checkOnLine(); } } static void delay(int count) { try { Thread.sleep(count*1000); } catch (InterruptedException e) { e.printStackTrace(); } } //---------------------------具体实现交给子类,或是实例化时实现----------------------- /** * 客户端:连接建立 * 注意:此回调是在新线程中执行的 * @param client 客户端对象 */ public abstract void onConnect(MediaTransceiver client); /** * 客户端:连接建立失败 * 注意:此回调是在新线程中执行的 */ public abstract void onConnectFailed(); /** * 客户端:收到字符串 * 注意:此回调是在新线程中执行的 * @param client 客户端对象 * @param s 字符串 */ public abstract void onReceive(MediaTransceiver client, String s); /** * 客户端:连接断开 * 注意:此回调是在新线程中执行的 * @param client MediaCline客户端对象 */ public abstract void onDisconnect(MediaTransceiver client); /** * 服务器停止 * 注意:此回调是在新线程中执行的 */ public abstract void onServerStop(); 
          
        
       
       
      
      
     
     
    
    
package com.ist.socket;

import java.net.InetAddress;
import java.net.Socket;


public abstract class MediaClinet implements Runnable{
	
	private int port;
	private String hostIP;
	private boolean mBlnNetIsOk = false;
	private MediaTransceiver transceiver;
	//终端标识码
	private String falshId;
	
	public MediaClinet(String falshId) {
		this.falshId = falshId;
	}
	/**
	 * 建立连接
	 * 连接的建立将在新线程中进行
	 * 连接建立成功,回调{@code onConnect()}
	 * 连接建立失败,回调{@code onConnectFailed()}
	 * @param hostIP 服务器主机IP
	 * @param port 端口
	 */
	public void connect(String hostIP, int port) {
		this.hostIP = hostIP;
		this.port = port;
		new Thread(this).start();
	}
	
	private void connect(){
		try{
			Socket socket = new Socket(hostIP, port);
			transceiver = new MediaTransceiver(socket) {
				
				@Override
				public void onReceive(InetAddress addr, String s) {
					if(s.equals("0")){
						this.send("1");
					}
					MediaClinet.this.onReceive(transceiver, s);
					
				}
				
				@Override
				public void onDisconnect(InetAddress addr) {
					mBlnNetIsOk = false;
					MediaClinet.this.onDisconnect(transceiver,falshId);
				}
			};
			transceiver.start();
			mBlnNetIsOk = true;
			transceiver.sendFalshId(falshId);
			this.onConnect(transceiver,falshId);
			
		} catch (Exception e) {
			e.printStackTrace();
			this.onConnectFailed();
		}
	}
	@Override
	public void run(){
		connect();
		try {
			Thread.sleep(4000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		while (true) {	
			if (mBlnNetIsOk == false) {
				connect();
			}
			try {
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	private boolean sendPacket(String string) {
		// TODO Auto-generated method stub
		if(transceiver!=null){
			return transceiver.send(string);
		}
		return false;
	}
	/**
	 * 断开连接
	 * 连接断开,回调{@code onDisconnect()}
	 */
	public void disconnect() {
		if (transceiver != null) {
			transceiver.stop();
			transceiver = null;
		}
	}
	/**
	 * 获取Socket收发器
	 * 
	 * @return 未连接则返回null
	 */
	public MediaTransceiver getTransceiver() {
		return isConnected() ? transceiver : null;
	}

	
	/**
	 * 判断是否连接
	 * 
	 * @return 当前处于连接状态,则返回true
	 */
	public boolean isConnected() {
		return mBlnNetIsOk;
	}
	/**
	 * 连接建立
	 * @param transceiver SocketTransceiver对象
	 */
	public abstract void onConnect(MediaTransceiver transceiver,String falshId);

	/**
	 * 连接建立失败
	 */
	public abstract void onConnectFailed();

	/**
	 * 接收到数据
	 * 注意:此回调是在新线程中执行的
	 * @param transceiver Socket收发对象
	 * @param s 字符串
	 */
	public abstract void onReceive(MediaTransceiver transceiver, String s);

	/**
	 * 连接断开
	 * 注意:此回调是在新线程中执行的
	 * @param transceiver Socket收发对象
	 */
	public abstract void onDisconnect(MediaTransceiver transceiver,String falshId);
	
	
}

这个socket 通讯类大体框架是借鉴某个高手的博客里面的,具体我就不记得了(我只能说对不起了)。我大概说一下我代码里面的功能:

1.MediaServer 采用的是单例模式,没当有一个clinet 连接的时候就会创建一个MediaTransceiver(接收者)对象。

2.MediaClinet 创建一个MediaTransceiver(接收者)对象并发送一个标识自己的falshid。(因为ip是会变的,不会做为唯一键)

3.也是socket比较难处理的一个问题:非正常断线,就是你拔掉网线的时候socket是检查不到断开的。在网上看了很多帖子:(1)设置超时时间

(2)设置keepAlive (3)设置心跳包。第一个超时时间是read的阻塞时间,并不是说socket的运行了这么久然后就超时断开。所以超时和断开没关系,

第二个keepAlive是十二分钟检测一次服务器的活动状态,个人觉得有点久。第三个正常断开还行,非正常断开(拔网线)就检测不出了。

所以我通过发送0给客户端等待客户端响应1回来,没有则判断为离线。等待响应用的是线程,不然如果断线则会卡在read方法那里。

 

最后java是开源,代码共享,互相进步,延续这种精神。如果有疑问可以加qq群124795403 交流。我不是大牛,我只是代码的搬运工。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/songqiang2011/article/details/52669741

智能推荐

攻防世界_难度8_happy_puzzle_攻防世界困难模式攻略图文-程序员宅基地

文章浏览阅读645次。这个肯定是末尾的IDAT了,因为IDAT必须要满了才会开始一下个IDAT,这个明显就是末尾的IDAT了。,对应下面的create_head()代码。,对应下面的create_tail()代码。不要考虑爆破,我已经试了一下,太多情况了。题目来源:UNCTF。_攻防世界困难模式攻略图文

达梦数据库的导出(备份)、导入_达梦数据库导入导出-程序员宅基地

文章浏览阅读2.9k次,点赞3次,收藏10次。偶尔会用到,记录、分享。1. 数据库导出1.1 切换到dmdba用户su - dmdba1.2 进入达梦数据库安装路径的bin目录,执行导库操作  导出语句:./dexp cwy_init/[email protected]:5236 file=cwy_init.dmp log=cwy_init_exp.log 注释:   cwy_init/init_123..._达梦数据库导入导出

js引入kindeditor富文本编辑器的使用_kindeditor.js-程序员宅基地

文章浏览阅读1.9k次。1. 在官网上下载KindEditor文件,可以删掉不需要要到的jsp,asp,asp.net和php文件夹。接着把文件夹放到项目文件目录下。2. 修改html文件,在页面引入js文件:<script type="text/javascript" src="./kindeditor/kindeditor-all.js"></script><script type="text/javascript" src="./kindeditor/lang/zh-CN.js"_kindeditor.js

STM32学习过程记录11——基于STM32G431CBU6硬件SPI+DMA的高效WS2812B控制方法-程序员宅基地

文章浏览阅读2.3k次,点赞6次,收藏14次。SPI的详情简介不必赘述。假设我们通过SPI发送0xAA,我们的数据线就会变为10101010,通过修改不同的内容,即可修改SPI中0和1的持续时间。比如0xF0即为前半周期为高电平,后半周期为低电平的状态。在SPI的通信模式中,CPHA配置会影响该实验,下图展示了不同采样位置的SPI时序图[1]。CPOL = 0,CPHA = 1:CLK空闲状态 = 低电平,数据在下降沿采样,并在上升沿移出CPOL = 0,CPHA = 0:CLK空闲状态 = 低电平,数据在上升沿采样,并在下降沿移出。_stm32g431cbu6

计算机网络-数据链路层_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏8次。数据链路层习题自测问题1.数据链路(即逻辑链路)与链路(即物理链路)有何区别?“电路接通了”与”数据链路接通了”的区别何在?2.数据链路层中的链路控制包括哪些功能?试讨论数据链路层做成可靠的链路层有哪些优点和缺点。3.网络适配器的作用是什么?网络适配器工作在哪一层?4.数据链路层的三个基本问题(帧定界、透明传输和差错检测)为什么都必须加以解决?5.如果在数据链路层不进行帧定界,会发生什么问题?6.PPP协议的主要特点是什么?为什么PPP不使用帧的编号?PPP适用于什么情况?为什么PPP协议不_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输

软件测试工程师移民加拿大_无证移民,未受过软件工程师的教育(第1部分)-程序员宅基地

文章浏览阅读587次。软件测试工程师移民加拿大 无证移民,未受过软件工程师的教育(第1部分) (Undocumented Immigrant With No Education to Software Engineer(Part 1))Before I start, I want you to please bear with me on the way I write, I have very little gen...

随便推点

Thinkpad X250 secure boot failed 启动失败问题解决_安装完系统提示secureboot failure-程序员宅基地

文章浏览阅读304次。Thinkpad X250笔记本电脑,装的是FreeBSD,进入BIOS修改虚拟化配置(其后可能是误设置了安全开机),保存退出后系统无法启动,显示:secure boot failed ,把自己惊出一身冷汗,因为这台笔记本刚好还没开始做备份.....根据错误提示,到bios里面去找相关配置,在Security里面找到了Secure Boot选项,发现果然被设置为Enabled,将其修改为Disabled ,再开机,终于正常启动了。_安装完系统提示secureboot failure

C++如何做字符串分割(5种方法)_c++ 字符串分割-程序员宅基地

文章浏览阅读10w+次,点赞93次,收藏352次。1、用strtok函数进行字符串分割原型: char *strtok(char *str, const char *delim);功能:分解字符串为一组字符串。参数说明:str为要分解的字符串,delim为分隔符字符串。返回值:从str开头开始的一个个被分割的串。当没有被分割的串时则返回NULL。其它:strtok函数线程不安全,可以使用strtok_r替代。示例://借助strtok实现split#include <string.h>#include <stdio.h&_c++ 字符串分割

2013第四届蓝桥杯 C/C++本科A组 真题答案解析_2013年第四届c a组蓝桥杯省赛真题解答-程序员宅基地

文章浏览阅读2.3k次。1 .高斯日记 大数学家高斯有个好习惯:无论如何都要记日记。他的日记有个与众不同的地方,他从不注明年月日,而是用一个整数代替,比如:4210后来人们知道,那个整数就是日期,它表示那一天是高斯出生后的第几天。这或许也是个好习惯,它时时刻刻提醒着主人:日子又过去一天,还有多少时光可以用于浪费呢?高斯出生于:1777年4月30日。在高斯发现的一个重要定理的日记_2013年第四届c a组蓝桥杯省赛真题解答

基于供需算法优化的核极限学习机(KELM)分类算法-程序员宅基地

文章浏览阅读851次,点赞17次,收藏22次。摘要:本文利用供需算法对核极限学习机(KELM)进行优化,并用于分类。

metasploitable2渗透测试_metasploitable2怎么进入-程序员宅基地

文章浏览阅读1.1k次。一、系统弱密码登录1、在kali上执行命令行telnet 192.168.26.1292、Login和password都输入msfadmin3、登录成功,进入系统4、测试如下:二、MySQL弱密码登录:1、在kali上执行mysql –h 192.168.26.129 –u root2、登录成功,进入MySQL系统3、测试效果:三、PostgreSQL弱密码登录1、在Kali上执行psql -h 192.168.26.129 –U post..._metasploitable2怎么进入

Python学习之路:从入门到精通的指南_python人工智能开发从入门到精通pdf-程序员宅基地

文章浏览阅读257次。本文将为初学者提供Python学习的详细指南,从Python的历史、基础语法和数据类型到面向对象编程、模块和库的使用。通过本文,您将能够掌握Python编程的核心概念,为今后的编程学习和实践打下坚实基础。_python人工智能开发从入门到精通pdf

推荐文章

热门文章

相关标签