当前位置: 首页 > 图文教程 > 网络编程 > Javascript > 简要介绍实现多线程环形缓冲的方法

Javascript
JS getMonth()日期函数的值域是0-11
jQuery 处理网页内容的实现代码
jQuery 树形结构的选择器
jQuery 处理表单元素的代码
JQuery 动画卷页 返回顶部 动画特效(兼容Chrome)
JavaScript 10件让人费解的事情
类似GMAIL的Ajax信息反馈显示
两个比较有用的Javascript工具函数代码
JavaScript Timer实现代码
JavaScript 学习技巧
JavaScript 题型问答有答案参考
js删除select中重复项的实现代码
javascript中的链式调用
JavaScript DOM学习第一章 W3C DOM简介
JavaScript DOM 学习第二章 编辑文本
JavaScript DOM 学习第三章 内容表格
JavaScript DOM学习第四章 getElementByTagNames
JavaScript DOM 学习第五章 表单简介
JavaScript DOM学习第六章 表单实例
JavaScript DOM 学习第七章 表单的扩展

Javascript 中的 简要介绍实现多线程环形缓冲的方法


出处:互联网   整理: 软晨网(RuanChen.com)   发布: 2009-09-28   浏览: 163 ::
收藏到网摘: n/a

 

我平时比较喜欢从网上听歌,有些链接下载速度太慢了。如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂。

 

简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。

 

经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。我把实现多线程下载缓冲的方法写出来,不足之处恳请批评指正。

 

一、HttpReader类功能:HTTP协议从指定URL读取数据

 

/** *//**
* author by http://www.bt285.cn http://www.5a520.cn
*/
package instream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
public final class HttpReader { public static final int MAX_RETRY = 10; private static long content_length; private URL url; private HttpURLConnection httpConnection; private InputStream in_stream; private long cur_pos; //用于决定seek方法中是否执行文件定位 private int connect_timeout; private int read_timeout; public HttpReader(URL u) { this(u, 5000, 5000); } public HttpReader(URL u, int connect_timeout, int read_timeout) { this.connect_timeout = connect_timeout; this.read_timeout = read_timeout; url = u; if (content_length == 0) { int retry = 0; while (retry < HttpReader.MAX_RETRY) try { this.seek(0); content_length = httpConnection.getContentLength(); break; } catch (Exception e) { retry++; } } } public static long getContentLength() { return content_length; } public int read(byte[] b, int off, int len) throws IOException { int r = in_stream.read(b, off, len); cur_pos += r; return r; } public int getData(byte[] b, int off, int len) throws IOException { int r, rema = len; while (rema > 0) { if ((r = in_stream.read(b, off, rema)) == -1) { return -1; } rema -= r; off += r; cur_pos += r; } return len; } public void close() { if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null; } if (in_stream != null) { try { in_stream.close(); } catch (IOException e) {} in_stream = null; } url = null; } /**//* * 抛出异常通知再试 * 响应码503可能是由某种暂时的原因引起的,例如同一IP频繁的连接请求可能遭服务器拒绝 */ public void seek(long start_pos) throws IOException { if (start_pos == cur_pos && in_stream != null) return; if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null; } if (in_stream != null) { in_stream.close(); in_stream = null; } httpConnection = (HttpURLConnection) url.openConnection(); httpConnection.setConnectTimeout(connect_timeout); httpConnection.setReadTimeout(read_timeout); String sProperty = "bytes=" + start_pos + "-"; httpConnection.setRequestProperty("Range", sProperty); //httpConnection.setRequestProperty("Connection", "Keep-Alive"); int responseCode = httpConnection.getResponseCode(); if (responseCode < 200 || responseCode >= 300) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } throw new IOException("HTTP responseCode="+responseCode); } in_stream = httpConnection.getInputStream(); cur_pos = start_pos; }
}

 

二、IWriterCallBack接口功能:实现读/写通信。

 

package instream;
public interface IWriterCallBack { public boolean tryWriting(Writer w) throws InterruptedException; public void updateBuffer(int i, int len); public void updateWriterCount(); public void terminateWriters();
}

 

 

三、Writer类:下载线程,负责向buf[]写数据。

 

/** *//**
* http://www.bt285.cn http://www.5a520.cn
*/
package instream;
import java.io.IOException;
import java.net.URL;
public final class Writer implements Runnable { private static boolean isalive = true; private byte[] buf; private IWriterCallBack icb; protected int index; //buf[]内"块"索引号 protected long start_pos; //index对应的文件位置(相对于文件首的偏移量) protected int await_count; //用于判断:下载速度足够就退出一个"写"线程 private HttpReader hr; public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) { hr = new HttpReader(u); if(HttpReader.getContentLength() == 0) //实例化HttpReader对象都不成功 return; icb = call_back; buf = b; Thread t = new Thread(this,"dt_"+i); t.setPriority(Thread.NORM_PRIORITY + 1); t.start(); } public void run() { int write_bytes=0, write_pos=0, rema = 0, retry = 0; boolean cont = true; while (cont) { try { // 1.等待空闲块 if(retry == 0) { if (icb.tryWriting(this) == false) break; write_bytes = 0; rema = BuffRandAcceURL.UNIT_LENGTH; write_pos = index << BuffRandAcceURL.UNIT_LENGTH_BITS; } // 2.定位 hr.seek(start_pos); // 3.下载"一块" int w; while (rema > 0 && isalive) { w = (rema < 2048) ? rema : 2048; //每次读几K合适? if ((w = hr.read(buf, write_pos, w)) == -1) { cont = false; break; } rema -= w; write_pos += w; start_pos += w; write_bytes += w; } //4.通知"读"线程 retry = 0; icb.updateBuffer(index, write_bytes); } catch (InterruptedException e) { isalive = false; icb.terminateWriters(); break; } catch (IOException e) { if(++retry == HttpReader.MAX_RETRY) { isalive = false; icb.terminateWriters(); break; } } } icb.updateWriterCount(); try { hr.close(); } catch (Exception e) {} hr = null; buf = null; icb = null; }
}

 

四、IRandomAccess接口:

 

随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。

 

package instream;
public interface IRandomAccess { public int read() throws Exception; public int read(byte b[]) throws Exception; public int read(byte b[], int off, int len) throws Exception; public int dump(int src_off, byte b[], int dst_off, int len) throws Exception; public void seek(long pos) throws Exception; public long length(); public long getFilePointer(); public void close();
}

 

五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。

 

关键是如何简单有效防止死锁?以下只是我的一次尝试,请指正。

 

/** *//**
* http://www.5a520.cn http://www.bt285.cn
*/
package instream;
import java.net.URL;
import java.net.URLDecoder;
import decode.Header;
import tag.MP3Tag;
import tag.TagThread;
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack { public static final int UNIT_LENGTH_BITS = 15; //32K public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; public static final int BUF_LENGTH = UNIT_LENGTH << 4; //16块 public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1); private static final int MAX_WRITER = 8; private static long file_pointer; private static int read_pos; private static int fill_bytes; private static byte[] buf; //同时也作读写同步锁:buf.wait()/buf.notify() private static int[] buf_bytes; private static int buf_index; private static int alloc_pos; private static URL url = null; private static boolean isalive = true; private static int writer_count; private static int await_count; private long file_length; private long frame_bytes; public BuffRandAcceURL(String sURL) throws Exception { this(sURL,MAX_WRITER); } public BuffRandAcceURL(String sURL, int download_threads) throws Exception { buf = new byte[BUF_LENGTH]; buf_bytes = new int[UNIT_COUNT]; url = new URL(sURL); //创建线程以异步方式解析ID3 new TagThread(url); //打印当前文件名 try { String s = URLDecoder.decode(sURL, "GBK"); System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1)); s = null; } catch (Exception e) { System.out.println("start>> " + sURL); } //创建"写"线程 for(int i = 0; i < download_threads; i++) new Writer(this, url, buf, i+1); frame_bytes = file_length = HttpReader.getContentLength(); if(file_length == 0) { Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。"; throw new Exception("retry " + HttpReader.MAX_RETRY); } writer_count = download_threads; //缓冲 try_cache(); //跳过ID3 v2 MP3Tag mP3Tag = new MP3Tag(); int v2_size = mP3Tag.checkID3V2(buf,0); if (v2_size > 0) { frame_bytes -= v2_size; //seek(v2_size): fill_bytes -= v2_size; file_pointer = v2_size; read_pos = v2_size; read_pos &= BUF_LENGTH_MASK; int units = v2_size >> UNIT_LENGTH_BITS; for(int i = 0; i < units; i++) { buf_bytes[i] = 0; this.notifyWriter(); } buf_bytes[units] -= v2_size; this.notifyWriter(); } mP3Tag = null; } private void try_cache() throws InterruptedException { int cache_size = BUF_LENGTH; if(cache_size > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos; cache_size -= UNIT_LENGTH; //等待填写当前正在读的那"一块"缓冲区 /**//*if(fill_bytes >= cache_size && writer_count > 0) { synchronized (buf) { buf.wait(); } return; }*/ //等待填满缓冲区 while (fill_bytes < cache_size) { if (writer_count == 0 || isalive == false) return; if(BUF_LENGTH > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos - UNIT_LENGTH; System.out.printf("\r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100); synchronized (buf) { buf.wait(); } } System.out.printf("\r"); } private int try_reading(int i, int len) throws Exception { int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1); int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); while (r < len) { if (writer_count == 0 || isalive == false) return r; try_cache(); r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); } return len; } /**//* * 各个"写"线程互斥等待空闲块 */ public synchronized boolean tryWriting(Writer w) throws InterruptedException { await_count++; while (buf_bytes[buf_index] != 0 && isalive) { this.wait(); } //下载速度足够就结束一个"写"线程 if(writer_count > 1 && w.await_count >= await_count && w.await_count >= writer_count) return false; if(alloc_pos >= file_length) return false; w.await_count = await_count; await_count--; w.start_pos = alloc_pos; w.index = buf_index; alloc_pos += UNIT_LENGTH; buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1; return isalive; } public void updateBuffer(int i, int len) { synchronized (buf) { buf_bytes[i] = len; fill_bytes += len; buf.notify(); } } public void updateWriterCount() { synchronized (buf) { writer_count--; buf.notify(); } } public synchronized void notifyWriter() { this.notifyAll(); } public void terminateWriters() { synchronized (buf) { if (isalive) { isalive = false; Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY + " 次后放弃,请您稍后再试。"; } buf.notify(); } notifyWriter(); } public int read() throws Exception { int iret = -1; int i = read_pos >> UNIT_LENGTH_BITS; // 1."等待"有1字节可读 while (buf_bytes[i] < 1) { try_cache(); if (writer_count == 0) return -1; } if(isalive == false) return -1; // 2.读取 iret = buf[read_pos] & 0xff; fill_bytes--; file_pointer++; read_pos++; read_pos &= BUF_LENGTH_MASK; if (--buf_bytes[i] == 0) notifyWriter(); // 3.通知 return iret; } public int read(byte b[]) throws Exception { return read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws Exception { if(len > UNIT_LENGTH) len = UNIT_LENGTH; int i = read_pos >> UNIT_LENGTH_BITS; // 1."等待"有足够内容可读 if(try_reading(i, len) < len || isalive == false) return -1; // 2.读取 int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH if (tail_len < len) { System.arraycopy(buf, read_pos, b, off, tail_len); System.arraycopy(buf, 0, b, off + tail_len, len - tail_len); } else System.arraycopy(buf, read_pos, b, off, len); fill_bytes -= len; file_pointer += len; read_pos += len; read_pos &= BUF_LENGTH_MASK; buf_bytes[i] -= len; if (buf_bytes[i] < 0) { int ni = read_pos >> UNIT_LENGTH_BITS; buf_bytes[ni] += buf_bytes[i]; buf_bytes[i] = 0; notifyWriter(); } else if (buf_bytes[i] == 0) notifyWriter(); return len; } /**//* * 从src_off位置复制,不移动文件"指针" */ public int dump(int src_off, byte b[], int dst_off, int len) throws Exception { int rpos = read_pos + src_off; if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false) return -1; int tail_len = BUF_LENGTH - rpos; if (tail_len < len) { System.arraycopy(buf, rpos, b, dst_off, tail_len); System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len); } else System.arraycopy(buf, rpos, b, dst_off, len); // 不发信号 return len; } public long length() { return file_length; } public long getFilePointer() { return file_pointer; } public void close() { // } // public void seek(long pos) throws Exception { // }
}