当前位置: 首页 > 图文教程 > 网络编程 > Javascript > 简要介绍实现多线程环形缓冲的方法

Javascript
jQuery生成asp.net服务器控件的代码
javascript 实现的完全兼容鼠标滚轴缩放图片的代码
JavaScript学习笔记(十七)js 优化
使用SyntaxHighlighter实现HTML高亮显示代码的方法
javascript contains和compareDocumentPosition 方法来确定是否HTML节点间的关系
利用jQuery 实现GridView异步排序、分页的代码
jquery.lazyload 实现图片延迟加载jquery插件
Lazy Load 延迟加载图片的 jQuery 插件
jquery 插件实现图片延迟加载效果代码
javascript小数计算出现近似值的解决办法
jquery1.4后 jqDrag 拖动 不可用
jquery 应用代码 方便的排序功能
选择TreeView控件的树状数据节点的JS方法(jquery)
jquery 图片Silhouette Fadeins渐显效果
JQuery Dialog(JS 模态窗口,可拖拽的DIV)
javascript 同时在IE和FireFox获取KeyCode的代码
js 键盘记录实现(兼容FireFox和IE)
javascript 函数速查表
jQuery AnythingSlider滑动效果插件
经典海量jQuery插件 大家可以收藏一下

Javascript 中的 简要介绍实现多线程环形缓冲的方法


出处:互联网   整理: 软晨网(RuanChen.com)   发布: 2009-09-28   浏览: 170 ::
收藏到网摘: n/a

 

我平时比较喜欢从网上听歌,有些链接下载速度太慢了。如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂。

 

简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。

 

经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。我把实现多线程下载缓冲的方法写出来,不足之处恳请批评指正。

 

一、HttpReader类功能:HTTP协议从指定URL读取数据

 

/** *//**
* author by http://www.bt285.cn http://www.5a520.cn
*/
package instream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
public final class HttpReader { public static final int MAX_RETRY = 10; private static long content_length; private URL url; private HttpURLConnection httpConnection; private InputStream in_stream; private long cur_pos; //用于决定seek方法中是否执行文件定位 private int connect_timeout; private int read_timeout; public HttpReader(URL u) { this(u, 5000, 5000); } public HttpReader(URL u, int connect_timeout, int read_timeout) { this.connect_timeout = connect_timeout; this.read_timeout = read_timeout; url = u; if (content_length == 0) { int retry = 0; while (retry < HttpReader.MAX_RETRY) try { this.seek(0); content_length = httpConnection.getContentLength(); break; } catch (Exception e) { retry++; } } } public static long getContentLength() { return content_length; } public int read(byte[] b, int off, int len) throws IOException { int r = in_stream.read(b, off, len); cur_pos += r; return r; } public int getData(byte[] b, int off, int len) throws IOException { int r, rema = len; while (rema > 0) { if ((r = in_stream.read(b, off, rema)) == -1) { return -1; } rema -= r; off += r; cur_pos += r; } return len; } public void close() { if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null; } if (in_stream != null) { try { in_stream.close(); } catch (IOException e) {} in_stream = null; } url = null; } /**//* * 抛出异常通知再试 * 响应码503可能是由某种暂时的原因引起的,例如同一IP频繁的连接请求可能遭服务器拒绝 */ public void seek(long start_pos) throws IOException { if (start_pos == cur_pos && in_stream != null) return; if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null; } if (in_stream != null) { in_stream.close(); in_stream = null; } httpConnection = (HttpURLConnection) url.openConnection(); httpConnection.setConnectTimeout(connect_timeout); httpConnection.setReadTimeout(read_timeout); String sProperty = "bytes=" + start_pos + "-"; httpConnection.setRequestProperty("Range", sProperty); //httpConnection.setRequestProperty("Connection", "Keep-Alive"); int responseCode = httpConnection.getResponseCode(); if (responseCode < 200 || responseCode >= 300) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } throw new IOException("HTTP responseCode="+responseCode); } in_stream = httpConnection.getInputStream(); cur_pos = start_pos; }
}

 

二、IWriterCallBack接口功能:实现读/写通信。

 

package instream;
public interface IWriterCallBack { public boolean tryWriting(Writer w) throws InterruptedException; public void updateBuffer(int i, int len); public void updateWriterCount(); public void terminateWriters();
}

 

 

三、Writer类:下载线程,负责向buf[]写数据。

 

/** *//**
* http://www.bt285.cn http://www.5a520.cn
*/
package instream;
import java.io.IOException;
import java.net.URL;
public final class Writer implements Runnable { private static boolean isalive = true; private byte[] buf; private IWriterCallBack icb; protected int index; //buf[]内"块"索引号 protected long start_pos; //index对应的文件位置(相对于文件首的偏移量) protected int await_count; //用于判断:下载速度足够就退出一个"写"线程 private HttpReader hr; public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) { hr = new HttpReader(u); if(HttpReader.getContentLength() == 0) //实例化HttpReader对象都不成功 return; icb = call_back; buf = b; Thread t = new Thread(this,"dt_"+i); t.setPriority(Thread.NORM_PRIORITY + 1); t.start(); } public void run() { int write_bytes=0, write_pos=0, rema = 0, retry = 0; boolean cont = true; while (cont) { try { // 1.等待空闲块 if(retry == 0) { if (icb.tryWriting(this) == false) break; write_bytes = 0; rema = BuffRandAcceURL.UNIT_LENGTH; write_pos = index << BuffRandAcceURL.UNIT_LENGTH_BITS; } // 2.定位 hr.seek(start_pos); // 3.下载"一块" int w; while (rema > 0 && isalive) { w = (rema < 2048) ? rema : 2048; //每次读几K合适? if ((w = hr.read(buf, write_pos, w)) == -1) { cont = false; break; } rema -= w; write_pos += w; start_pos += w; write_bytes += w; } //4.通知"读"线程 retry = 0; icb.updateBuffer(index, write_bytes); } catch (InterruptedException e) { isalive = false; icb.terminateWriters(); break; } catch (IOException e) { if(++retry == HttpReader.MAX_RETRY) { isalive = false; icb.terminateWriters(); break; } } } icb.updateWriterCount(); try { hr.close(); } catch (Exception e) {} hr = null; buf = null; icb = null; }
}

 

四、IRandomAccess接口:

 

随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。

 

package instream;
public interface IRandomAccess { public int read() throws Exception; public int read(byte b[]) throws Exception; public int read(byte b[], int off, int len) throws Exception; public int dump(int src_off, byte b[], int dst_off, int len) throws Exception; public void seek(long pos) throws Exception; public long length(); public long getFilePointer(); public void close();
}

 

五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。

 

关键是如何简单有效防止死锁?以下只是我的一次尝试,请指正。

 

/** *//**
* http://www.5a520.cn http://www.bt285.cn
*/
package instream;
import java.net.URL;
import java.net.URLDecoder;
import decode.Header;
import tag.MP3Tag;
import tag.TagThread;
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack { public static final int UNIT_LENGTH_BITS = 15; //32K public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; public static final int BUF_LENGTH = UNIT_LENGTH << 4; //16块 public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1); private static final int MAX_WRITER = 8; private static long file_pointer; private static int read_pos; private static int fill_bytes; private static byte[] buf; //同时也作读写同步锁:buf.wait()/buf.notify() private static int[] buf_bytes; private static int buf_index; private static int alloc_pos; private static URL url = null; private static boolean isalive = true; private static int writer_count; private static int await_count; private long file_length; private long frame_bytes; public BuffRandAcceURL(String sURL) throws Exception { this(sURL,MAX_WRITER); } public BuffRandAcceURL(String sURL, int download_threads) throws Exception { buf = new byte[BUF_LENGTH]; buf_bytes = new int[UNIT_COUNT]; url = new URL(sURL); //创建线程以异步方式解析ID3 new TagThread(url); //打印当前文件名 try { String s = URLDecoder.decode(sURL, "GBK"); System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1)); s = null; } catch (Exception e) { System.out.println("start>> " + sURL); } //创建"写"线程 for(int i = 0; i < download_threads; i++) new Writer(this, url, buf, i+1); frame_bytes = file_length = HttpReader.getContentLength(); if(file_length == 0) { Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。"; throw new Exception("retry " + HttpReader.MAX_RETRY); } writer_count = download_threads; //缓冲 try_cache(); //跳过ID3 v2 MP3Tag mP3Tag = new MP3Tag(); int v2_size = mP3Tag.checkID3V2(buf,0); if (v2_size > 0) { frame_bytes -= v2_size; //seek(v2_size): fill_bytes -= v2_size; file_pointer = v2_size; read_pos = v2_size; read_pos &= BUF_LENGTH_MASK; int units = v2_size >> UNIT_LENGTH_BITS; for(int i = 0; i < units; i++) { buf_bytes[i] = 0; this.notifyWriter(); } buf_bytes[units] -= v2_size; this.notifyWriter(); } mP3Tag = null; } private void try_cache() throws InterruptedException { int cache_size = BUF_LENGTH; if(cache_size > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos; cache_size -= UNIT_LENGTH; //等待填写当前正在读的那"一块"缓冲区 /**//*if(fill_bytes >= cache_size && writer_count > 0) { synchronized (buf) { buf.wait(); } return; }*/ //等待填满缓冲区 while (fill_bytes < cache_size) { if (writer_count == 0 || isalive == false) return; if(BUF_LENGTH > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos - UNIT_LENGTH; System.out.printf("\r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100); synchronized (buf) { buf.wait(); } } System.out.printf("\r"); } private int try_reading(int i, int len) throws Exception { int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1); int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); while (r < len) { if (writer_count == 0 || isalive == false) return r; try_cache(); r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); } return len; } /**//* * 各个"写"线程互斥等待空闲块 */ public synchronized boolean tryWriting(Writer w) throws InterruptedException { await_count++; while (buf_bytes[buf_index] != 0 && isalive) { this.wait(); } //下载速度足够就结束一个"写"线程 if(writer_count > 1 && w.await_count >= await_count && w.await_count >= writer_count) return false; if(alloc_pos >= file_length) return false; w.await_count = await_count; await_count--; w.start_pos = alloc_pos; w.index = buf_index; alloc_pos += UNIT_LENGTH; buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1; return isalive; } public void updateBuffer(int i, int len) { synchronized (buf) { buf_bytes[i] = len; fill_bytes += len; buf.notify(); } } public void updateWriterCount() { synchronized (buf) { writer_count--; buf.notify(); } } public synchronized void notifyWriter() { this.notifyAll(); } public void terminateWriters() { synchronized (buf) { if (isalive) { isalive = false; Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY + " 次后放弃,请您稍后再试。"; } buf.notify(); } notifyWriter(); } public int read() throws Exception { int iret = -1; int i = read_pos >> UNIT_LENGTH_BITS; // 1."等待"有1字节可读 while (buf_bytes[i] < 1) { try_cache(); if (writer_count == 0) return -1; } if(isalive == false) return -1; // 2.读取 iret = buf[read_pos] & 0xff; fill_bytes--; file_pointer++; read_pos++; read_pos &= BUF_LENGTH_MASK; if (--buf_bytes[i] == 0) notifyWriter(); // 3.通知 return iret; } public int read(byte b[]) throws Exception { return read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws Exception { if(len > UNIT_LENGTH) len = UNIT_LENGTH; int i = read_pos >> UNIT_LENGTH_BITS; // 1."等待"有足够内容可读 if(try_reading(i, len) < len || isalive == false) return -1; // 2.读取 int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH if (tail_len < len) { System.arraycopy(buf, read_pos, b, off, tail_len); System.arraycopy(buf, 0, b, off + tail_len, len - tail_len); } else System.arraycopy(buf, read_pos, b, off, len); fill_bytes -= len; file_pointer += len; read_pos += len; read_pos &= BUF_LENGTH_MASK; buf_bytes[i] -= len; if (buf_bytes[i] < 0) { int ni = read_pos >> UNIT_LENGTH_BITS; buf_bytes[ni] += buf_bytes[i]; buf_bytes[i] = 0; notifyWriter(); } else if (buf_bytes[i] == 0) notifyWriter(); return len; } /**//* * 从src_off位置复制,不移动文件"指针" */ public int dump(int src_off, byte b[], int dst_off, int len) throws Exception { int rpos = read_pos + src_off; if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false) return -1; int tail_len = BUF_LENGTH - rpos; if (tail_len < len) { System.arraycopy(buf, rpos, b, dst_off, tail_len); System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len); } else System.arraycopy(buf, rpos, b, dst_off, len); // 不发信号 return len; } public long length() { return file_length; } public long getFilePointer() { return file_pointer; } public void close() { // } // public void seek(long pos) throws Exception { // }
}