Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14-程序员宅基地

技术标签: java  

Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14


——管道字符输出流、必须建立在管道输入流之上、所以先介绍管道字符输出流。可以先看示例或者总结、总结写的有点Q、不喜可无视、有误的地方指出则不胜感激。


一:PipedWriter


1、类功能简介:


管道字符输出流、用于将当前线程的指定字符写入到与此线程对应的管道字符输入流中去、所以PipedReader(pr)、PipedWriter(pw)必须配套使用、缺一不可。管道字符输出流的本质就是调用pr中的方法将字符或者字符数组写入到pr中、这一点是与众不同的地方。所以pw中的方法很少也很简单、主要就是负责将传入的pr与本身绑定、配对使用、然后就是调用绑定的pr的写入方法、将字符或者字符数组写入到pr的缓存字符数组中。


2、PipedWriter API简介:


A:关键字

    private PipedReader sink;	与此PipedWriter绑定的PipedReader

    
    private boolean closed = false;		标示此流是否关闭。

B:构造方法

	PipedWriter(PipedReader snk)	根据传入的PipedReader构造pw、并将pr与此pw绑定
    
    PipedWriter()	创建一个pw、在使用之前必须与一个pr绑定



C:一般方法

	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	void close()	关闭此流。
	
	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	synchronized void flush()	flush此流、唤醒pr中所有等待的方法。
	
	void write(int c)	将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
	
	void write(char cbuf[], int off, int len)	将cbuf的一部分写入pr的buf中去


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedWriter extends Writer {
	
	//与此PipedWriter绑定的PipedReader
    private PipedReader sink;

    //标示此流是否关闭。
    private boolean closed = false;

    /**
     * 根据传入的PipedReader构造pw、并将pr与此pw绑定
     */
    public PipedWriter(PipedReader snk)  throws IOException {
    	connect(snk);
    }
    
    /**
     * 创建一个pw、在使用之前必须与一个pr绑定
     */
    public PipedWriter() {
    }
    
    /**
     * 将此pw与一个pr绑定
     */
    public synchronized void connect(PipedReader snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
		    throw new IOException("Already connected");
		} else if (snk.closedByReader || closed) {
	            throw new IOException("Pipe closed");
	    }
	        
		sink = snk;
		snk.in = -1;
		snk.out = 0;
        snk.connected = true;
    }

    /**
     * 将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
     */
    public void write(int c)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(c);
    }

    /**
     * 将cbuf的一部分写入pr的buf中去
     */
    public void write(char cbuf[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < 0) {
        	throw new IndexOutOfBoundsException();
		}
		sink.receive(cbuf, off, len);
    }

    /**
     * flush此流、唤醒pr中所有等待的方法。
     */
    public synchronized void flush() throws IOException {
		if (sink != null) {
	            if (sink.closedByReader || closed) {
	                throw new IOException("Pipe closed");
	            }            
	            synchronized (sink) {
	                sink.notifyAll();
	            }
		}
    }

    /**
     * 关闭此流。
     */
    public void close()  throws IOException {
        closed = true;
		if (sink != null) {
		    sink.receivedLast();
		}
    }
}

4、实例演示:


因为PipedWriter必须与PipedReader结合使用、所以将两者的示例放在一起。

二:PipedReader


1、类功能简介:


管道字符输入流、用于读取对应绑定的管道字符输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信、pr中专门有两个方法供pw调用、receive(char c)、receive(char[] b, int off, intlen)、使得pw可以将字符或者字符数组写入pr的buffer中、

2、PipedReader API简介:


A:关键字

	boolean closedByWriter = false;		标记PipedWriter是否关闭
	
    boolean closedByReader = false;		标记PipedReader是否关闭
    
    boolean connected = false;			标记PipedWriter与标记PipedReader是否关闭的连接是否关闭

    Thread readSide; 	拥有PipedReader的线程
    
    Thread writeSide;	拥有PipedWriter的线程

    private static final int DEFAULT_PIPE_SIZE = 1024;		用于循环存放PipedWriter写入的字符数组的默认大小

    char buffer[];		用于循环存放PipedWriter写入的字符数组

    int in = -1;	buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。此为初始状态、即buf中没有字符

    int out = 0;	buf中下一个被读取的字符的下标


B:构造方法

	PipedReader(PipedWriter src)	使用默认的buf的大小和传入的pw构造pr
	
	PipedReader(PipedWriter src, int pipeSize)		使用指定的buf的大小和传入的pw构造pr
	
	PipedReader()		使用默认大小构造pr
	
	PipedReader(int pipeSize)		使用指定大小构造pr


C:一般方法

	void close()	清空buf中数据、关闭此流。
	
	void connect(PipedWriter src)	调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
	
	synchronized boolean ready()	查看此流是否可读
	
	synchronized int read()		从buf中读取一个字符、以整数形式返回
	
	synchronized int read(char cbuf[], int off, int len)	将buf中读取一部分字符到cbuf中。
	
	synchronized void receive(int c)	pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
	
	synchronized void receive(char c[], int off, int len)	将c中一部分字符写入到buf中。
	
	synchronized void receivedLast()	提醒所有等待的线程、已经接收到了最后一个字符。


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedReader extends Reader {
    boolean closedByWriter = false;
    boolean closedByReader = false;
    boolean connected = false;

    Thread readSide;
    Thread writeSide;

   /** 
    * 用于循环存放PipedWriter写入的字符数组的默认大小
    */ 
    private static final int DEFAULT_PIPE_SIZE = 1024;

    /**
     * 用于循环存放PipedWriter写入的字符数组
     */
    char buffer[];

    /**
     * buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。
     * in为-1时、说明buf中没有可读取字符、in=out时已经存满了。
     */
    int in = -1;

    /**
     * buf中下一个被读取的字符的下标
     */
    int out = 0;

    /**
     * 使用默认的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src) throws IOException {
    	this(src, DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src, int pipeSize) throws IOException {
		initPipe(pipeSize);
		connect(src);
    }


    /**
     * 使用默认大小构造pr
     */
    public PipedReader() {
    	initPipe(DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定大小构造pr
     */
    public PipedReader(int pipeSize) {
    	initPipe(pipeSize);
    }

    //初始化buf大小
    private void initPipe(int pipeSize) {
		if (pipeSize <= 0) {
		    throw new IllegalArgumentException("Pipe size <= 0");
		}
		buffer = new char[pipeSize];
    }

    /**
     * 调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
     */
    public void connect(PipedWriter src) throws IOException {
    	src.connect(this);
    }
    
    /**
     * pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
     */
    synchronized void receive(int c) throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
        	throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }

		writeSide = Thread.currentThread();
		while (in == out) {
		    if ((readSide != null) && !readSide.isAlive()) {
		    	throw new IOException("Pipe broken");
		    }
		    //buf中写入的被读取完、唤醒所有此对象监控的线程其他方法、如果一秒钟之后还是满值、则再次唤醒其他方法、直到buf中被读取。
		    notifyAll();	
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
		    	throw new java.io.InterruptedIOException();
		    }
		}
		//buf中存放第一个字符时、将字符在buf中存放位置的下标in初始化为0、读取的下标也初始化为0、准备接受写入的第一个字符。
		if (in < 0) {
		    in = 0;
		    out = 0;
		}
		buffer[in++] = (char) c;
		//如果buf中放满了、则再从头开始存放。
		if (in >= buffer.length) {
		    in = 0;
		}
    }

    /**
     * 将c中一部分字符写入到buf中。
     */
    synchronized void receive(char c[], int off, int len)  throws IOException {
		while (--len >= 0) {
		    receive(c[off++]);
		}
    }

    /**
     * 提醒所有等待的线程、已经接收到了最后一个字符、PipedWriter已关闭。用于PipedWriter的close()方法.
     */
    synchronized void receivedLast() {
		closedByWriter = true;
		notifyAll();
    }

    /**
     * 从buf中读取一个字符、以整数形式返回
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
		int trials = 2;
		while (in < 0) {
		    if (closedByWriter) { 
			/* closed by writer, return EOF */
			return -1;
		    }
		    if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
			throw new IOException("Pipe broken");
		    }
	            /* might be a writer waiting */
		    notifyAll();
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
			throw new java.io.InterruptedIOException();
		    }
	 	}
		int ret = buffer[out++];
		if (out >= buffer.length) {
		    out = 0;
		}
		if (in == out) {
	            /* now empty */
		    in = -1;		
		}
		return ret;
    }

    /**
     * 将buf中读取一部分字符到cbuf中。
     */
    public synchronized int read(char cbuf[], int off, int len)  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        if ((off < 0) || (off > cbuf.length) || (len < 0) ||
            ((off + len) > cbuf.length) || ((off + len) < 0)) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return 0;
		}

        /* possibly wait on the first character */
		int c = read();		
		if (c < 0) {
		    return -1;
		}
		cbuf[off] =  (char)c;
		int rlen = 1;
		while ((in >= 0) && (--len > 0)) {
		    cbuf[off + rlen] = buffer[out++];
		    rlen++;
		    //如果读取的下一个字符下标大于buffer的size、则重置out、从新开始从第一个开始读取。
		    if (out >= buffer.length) {
		    	out = 0;
		    }
		    //如果下一个写入字符的下标与下一个被读取的下标相同、则清空buf
		    if (in == out) {
	                /* now empty */
		    	in = -1;	
		    }
		}
		return rlen;
    }

    /**
     * 查看此流是否可读、看各个线程是否关闭、以及buffer中是否有可供读取的字符。
     */
    public synchronized boolean ready() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
	    throw new IOException("Pipe closed");
	} else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }
        if (in < 0) {
            return false;
        } else {
            return true;
        }
    }
 
    /**
     * 清空buf中数据、关闭此流。
     */
    public void close()  throws IOException {
		in = -1;
		closedByReader = true;
    }
}


4、实例演示:


用于发送字符的线程:CharSenderThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedWriter;

@SuppressWarnings("all")
public class CharSenderThread implements Runnable {
	private PipedWriter pw = new PipedWriter();
	
	public PipedWriter getPipedWriter(){
		return pw;
	}
	@Override
	public void run() {
		//sendOneChar();
		//sendShortMessage();
		sendLongMessage();
	}

	private void sendOneChar(){
		try {
			pw.write("a".charAt(0));
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendShortMessage() {
		try {
			pw.write("this is a short message from CharSenderThread !".toCharArray());
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendLongMessage(){
		try {
			char[] b = new char[1028];
			//生成一个长度为1028的字符数组、前1020个是1、后8个是2。
			for(int i=0; i<1020; i++){
				b[i] = 'a';
			}
			for (int i = 1020; i <1028; i++) {
				b[i] = 'b';
			}
			pw.write(b);
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

用于接收字符的线程: CharReceiveThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedReader;

@SuppressWarnings("all")
public class CharReceiverThread extends Thread {
	
	private PipedReader pr = new PipedReader();
	
	public PipedReader getPipedReader(){
		return pr;
	}
	@Override
	public void run() {
		//receiveOneChar();
		//receiveShortMessage();
		receiverLongMessage();
	}
	
	private void receiveOneChar(){
		try {
			int n = pr.read();
			System.out.println(n);
			pr.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiveShortMessage() {
		try {
			char[] b = new char[1024];
			int n = pr.read(b);
			System.out.println(new String(b, 0, n));
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiverLongMessage(){
		try {
			char[] b = new char[2048];
			int count = 0;
			while(true){
				count = pr.read(b); 
				for (int i = 0; i < count; i++) {
					System.out.print(b[i]);
				}
				if(count == -1)
					break;
			}
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
}

启动类:PipedWriterAndPipedReaderTest

package com.chy.io.original.test;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

import com.chy.io.original.thread.CharReceiverThread;
import com.chy.io.original.thread.CharSenderThread;

public class PipedWriterAndPipedReaderTest {
	public static void main(String[] args) throws IOException{
		CharSenderThread cst = new CharSenderThread();
		CharReceiverThread crt = new CharReceiverThread();
		PipedWriter pw = cst.getPipedWriter();
		PipedReader pr = crt.getPipedReader();
		
		pw.connect(pr);
		
		/**
		 * 想想为什么下面这样写会报Piped not connect异常 ?
		 */
		//new Thread(new CharSenderThread()).start();
		//new CharReceiverThread().start();
		
		new Thread(cst).start();
		crt.start();
	}
}

两个线程中分别有三个方法、可以对应的每次放开一对方法来测试、还有这里最后一个读取1028个字符的方法用了死循环来读取、可以试试当不用死循环来读取会有什么不一样的效果?初始化字符的时候要用char = 'a' 而不是cahr = "a"、可自己想原因。。。

总结:


PipedReader、PipedWriter两者的结合如鸳鸯一般、离开哪一方都不能继续存在、同时又如连理枝一般、PipedWriter先通过connect(PipedReader sink)来确定关系、并初始化PipedReader状态、告诉PipedReader只能属于这个PipedWriter、connect =true、当想赠与PipedReader字符时、就直接调用receive(char c) 、receive(char[] b, int off, int len)来将字符或者字符数组放入pr的存折buffer中。站在PipedReader角度上、看上哪个PipedWriter时就暗示pw、将主动权交给pw、调用pw的connect将自己给他去登记。当想要花(将字符读取到程序中)字符了就从buffer中拿、但是自己又没有本事挣字符、所以当buffer中没有字符时、自己就等着、并且跟pw讲没有字符了、pw就会向存折(buffer)中存字符、当然、pw不会一直不断往里存、当存折是空的时候也不会主动存、怕花冒、就等着pr要、要才存。过到最后两个只通过buffer来知道对方的存在与否、每次从buffer中存或者取字符时都会看看对方是否安康、若安好则继续生活、若一方不在、则另一方也不愿独存!


更多IO内容:java_io 体系之目录


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/iteye_563/article/details/82552845

智能推荐

oracle 12c 集群安装后的检查_12c查看crs状态-程序员宅基地

文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态

解决jupyter notebook无法找到虚拟环境的问题_jupyter没有pytorch环境-程序员宅基地

文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境

国内安装scoop的保姆教程_scoop-cn-程序员宅基地

文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn

Element ui colorpicker在Vue中的使用_vue el-color-picker-程序员宅基地

文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker

迅为iTOP-4412精英版之烧写内核移植后的镜像_exynos 4412 刷机-程序员宅基地

文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机

Linux系统配置jdk_linux配置jdk-程序员宅基地

文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk

随便推点

matlab(4):特殊符号的输入_matlab微米怎么输入-程序员宅基地

文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入

C语言程序设计-文件(打开与关闭、顺序、二进制读写)-程序员宅基地

文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。‍ Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。

Touchdesigner自学笔记之三_touchdesigner怎么让一个模型跟着鼠标移动-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动

【附源码】基于java的校园停车场管理系统的设计与实现61m0e9计算机毕设SSM_基于java技术的停车场管理系统实现与设计-程序员宅基地

文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计

Android系统播放器MediaPlayer源码分析_android多媒体播放源码分析 时序图-程序员宅基地

文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;amp;gt;Jni-&amp;amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图

java 数据结构与算法 ——快速排序法-程序员宅基地

文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法