如何解决Java Socket通信技术收发线程互斥

Java Socket通信技术在很长的`时间里都在使用,在不少的程序员眼中都有很多高的评价。那么下面我们就看看如何才能掌握这门复杂的编程语言,希望大家在今后的Java Socket通信技术使用中有所收获。

如何解决Java Socket通信技术收发线程互斥

下面就是Java Socket通信技术在解决收发线程互斥的代码介绍。

age ;

rt ception;

rt tStream;

rt utStream;

rt SocketAddress;

rt et;

rt etException;

rt etTimeoutException;

rt leDateFormat;

rt ;

rt erties;

rt r;

rt rTask;

rt urrentHashMap;

rt Unit;

rt ition;

rt trantLock;

rt er;

19./**

20.*

title: socket通信包装类

21.*

Description:

22.*

CopyRight: CopyRight (c) 2009

23.*

Company:

24.*

Create date: 2009-10-14

25.*author sunnylocus

26. * v0.10 2009-10-14 初类

27.* v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,

处理命令速度由原来8~16个/秒提高到25~32个/秒

28.*/ public class SocketConnection {

ate volatile Socket socket;

ate int timeout = 1000*10; //超时时间,初始值10秒

ate boolean isLaunchHeartcheck = false;//是否已启动心跳检测

ate boolean isNetworkConnect = false; //网络是否已连接

ate static String host = "";

ate static int port;

ic InputStream inStream = null;

ic OutputStream outStream = null;

ate static Logger log =ogger

(s);

ate static SocketConnection socketConnection = null;

ate static r heartTimer=null;

40.//private final Map recMsgMap= Collections.

synchronizedMap(new HashMap());

ate final ConcurrentHashMap recMsgMap

= new ConcurrentHashMap();

ate static Thread receiveThread = null;

ate final ReentrantLock lock = new ReentrantLock();

ate SocketConnection(){

erties conf = new Properties();

{

(esourceAsStream

(""));

out = eOf(roperty("timeout"));

(roperty("ip"),eOf

(roperty("port")));

50.} catch(IOException e) {

l("socket初始化异常!",e);

w new RuntimeException("socket初始化异常,请检查配置参数");

53.}

54.}

55./**

56.* 单态模式

57.*/

ic static SocketConnection getInstance() {

(socketConnection==null) {

hronized(s) {

(socketConnection==null) {

etConnection = new SocketConnection();

rn socketConnection;

64.}

65.}

66.}

rn socketConnection;

68.}

ate void init(String host,int port) throws IOException {

SocketAddress addr = new InetSocketAddress(host,port);

et = new Socket();

hronized (this) {

("【准备与"+addr+"建立连接】");

ect(addr, timeout);

("【与"+addr+"连接已建立】");

ream = nputStream();

tream = utputStream();

cpNoDelay(true);//数据不作缓冲,立即发送

oLinger(true, 0);//socket关闭时,立即释放资源

eepAlive(true);

rafficClass(0x04|0x10);//高可靠性和最小延迟传输

tworkConnect=true;

iveThread = new Thread(new ReceiveWorker());

t();

=host;

=port;

(!isLaunchHeartcheck)

chHeartcheck();

89.}

90.}

91./**

92.* 心跳包检测

93.*/

ate void launchHeartcheck() {

(socket == null)

w new IllegalStateException("socket is not

established!");

tTimer = new Timer();

unchHeartcheck = true;

dule(new TimerTask() {

ic void run() {

ng msgStreamNo = treamNo("kq");

mstType =9999;//999-心跳包请求

leDateFormat dateformate = new SimpleDateFormat

("yyyyMMddHHmmss");

ng msgDateTime = at(new Date());

msgLength =38;//消息头长度

ng commandstr = "00" +msgLength + mstType + msgStreamNo;

("心跳检测包 -> IVR "+commandstr);

reconnCounter = 1;

e(true) {

ng responseMsg =null;

{

onseMsg = readReqMsg(commandstr);

113.} catch (IOException e) {

r("IO流异常",e);

nnCounter ++;

116.}

(responseMsg!=null) {

("心跳响应包 <- IVR "+responseMsg);

nnCounter = 1;

k;

121.} else {

nnCounter ++;

123.}

(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,

重新建立连接。连接未被建立时不释放锁

nnectToCTCC(); break;

126.}

127.}

128.}

129.},1000 * 60*1,1000*60*2);

130.}

131./**

132.* 重连与目标IP建立重连

133.*/

ate void reConnectToCTCC() {

Thread(new Runnable(){

ic void run(){

("重新建立与"+host+":"+port+"的连接");

138.//清理工作,中断计时器,中断接收线程,恢复初始变量

el();

unchHeartcheck=false;

tworkConnect = false;

rrupt();

{

e();

145.} catch (IOException e1) {r("重连时,关闭socket连

接发生IO流异常",e1);}

146.//----------------

hronized(this){

(; ;){

{

entThread();

p(1000 * 1);

(host,port);

fyAll();

k ;

155.} catch (IOException e) {

r("重新建立连接未成功",e);

157.} catch (InterruptedException e){

r("重连线程中断",e);

159.}

160.}

161.}

162.}

163.})t();

164.}

165./**

166.* 发送命令并接受响应

167.* @param requestMsg

168.* @return

169.* @throws SocketTimeoutException

170.* @throws IOException

171.*/

ic String readReqMsg(String requestMsg) throws IOException {

(requestMsg ==null) {

rn null;

175.}

(!isNetworkConnect) {

hronized(this){

{

(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常

(!isNetworkConnect) {

w new IOException("网络连接中断!");

182.}

183.} catch (InterruptedException e) {

r("发送线程中断",e);

185.}

186.}

187.}

ng msgNo = tring(8, 8 + 24);//读取流水号

tream = utputStream();

e(ytes());

h();

ition msglock = ondition(); //消息锁

193.//注册等待接收消息

(msgNo, msglock);

{

();

t(timeout,ISECONDS);

198.} catch (InterruptedException e) {

r("发送线程中断",e);

200.} finally {

ck();

202.}

ct respMsg = ve(msgNo); //响应信息

(respMsg!=null &&(respMsg != msglock)) {

205.//已经接收到消息,注销等待,成功返回消息

rn (String) respMsg;

207.} else {

r(msgNo+" 超时,未收到响应消息");

w new SocketTimeoutException(msgNo+" 超时,未收到响应消息");

210.}

211.}

ic void finalize() {

(socket != null) {

{

e();

216.} catch (IOException e) {

tStackTrace();

218.}

219.}

220.}

221.//消息接收线程

ate class ReceiveWorker implements Runnable {

ng intStr= null;

ic void run() {

e(!rrupted()){

{

[] headBytes = new byte[4];

((headBytes)==-1){

("读到流未尾,对方已关闭流!");

nnectToCTCC();//读到流未尾,对方已关闭流

rn;

232.}

[] tmp =new byte[4];

= headBytes;

ng tempStr = new String(tmp)();

(tempStr==null || ls("")) {

r("received message is null");

inue;

239.}

tr = new String(tmp);

totalLength =eInt(intStr);

242.//----------------

[] msgBytes = new byte[totalLength-4];

(msgBytes);

ng resultMsg = new String(headBytes)+ new

String(msgBytes);

246.//抽出消息ID

ng msgNo = tring(8, 8 + 24);

ition msglock =(Condition) (msgNo);

(msglock ==null) {

(msgNo+"序号可能已被注销!响应消息丢弃");

ve(msgNo);

inue;

253.}

(msgNo, resultMsg);

{

();

alAll();

258.}finally {

ck();

260.}

261.}catch(SocketException e){

r("服务端关闭socket",e);

nnectToCTCC();

264.} catch(IOException e) {

r("接收线程读取响应数据时发生IO流异常",e);

266.} catch(NumberFormatException e){

r("收到没良心包,String转int异常,异常字符:"+intStr);

268.}

269.}

270.}

271.}

272.}