nio serversocketchannel selector socketchannel 服务器传文件
说在前面: 给我的需求是实现从服务器 A 将大量文件(大概几十 TB)发送到服务器 B,在 A 服务器生成文件的 MD5 码,并在服务器 B 进行 md5 验证,验证通过保存。
我的实现思路:
AAA0000:D:\upload\addChannel.html
AAA0001:D:\upload\addChannel2.html
AAA0002:D:\upload\addContactPerson.html
AAA0003:D:\upload\admin.html
AAA0004:D:\upload\businessOfChannel.html
....
AAA9999:D:\upload\admin1.html
AAB0000:D:\upload\businessOfChannel1.html
...
第一部分:将文件目录存储到文本中,文件夹不进行存储。
- import java.io.File;
- import java.io.FileOutputStream;
- public class ReadAllPaths {
- private static final String rootPath = "D:/upload/"; //the root path of the files which will be copied
- private static final String filePath = "G:/temp/unUploadedFilePath.txt"; //the record of all files path
- /*
- * the items of prefix and num construct the path prefix,for example AAA0001
- * and it's mainly convenient for searching
- */
- private String prefix = "AAA"; private int num = 0;
- /**
- * main
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- ReadAllPaths paths = new ReadAllPaths();
- File file = new File(filePath);
- if (file.exists()) {
- file.delete();
- }
- FileOutputStream out = new FileOutputStream(file, true);
- paths.getAllPaths(rootPath, out);
- out.close();
- }
- /**
- * get all path out
- * @param root
- * @param out
- * @throws Exception
- */
- private void getAllPaths(String root, FileOutputStream out) throws Exception {
- File file = new File(root);
- if (file.isDirectory()) {
- try {
- if (file.list().length == 0) {
- return;
- } else {
- String[] files = file.list();
- for (String f: files) {
- getAllPaths(root + f + File.separator, out);
- }
- }
- } catch(NullPointerException npe) {
- return;
- }
- } else {
- String pathNum = getPathNum();
- String path = file.getAbsolutePath();
- out.write((pathNum + ":" + path + "\n").getBytes());
- }
- }
- /**
- * get the path prefix
- * @return
- */
- private String getPathNum() {
- StringBuilder sb = new StringBuilder();
- sb.append(getPrefix()).append(getNum());
- setNum();
- return sb.toString();
- }
- /**
- * get the String prefix of path prefix
- * @return
- */
- private String getPrefix() {
- return prefix;
- }
- /**
- * set the String prefix of path prefix
- * for example:AAA AAB AAC....AAZ ABA....AZZ BAA...
- */
- private void setPrefix() {
- char[] ch = new char[3];
- ch = getPrefix().toCharArray();
- if (ch[2] != 'Z') {
- ch[2]++;
- } else {
- ch[2] = 'A';
- if (ch[1] != 'Z') {
- ch[1]++;
- } else {
- ch[1] = 'A';
- ch[0]++;
- }
- }
- prefix = new String(ch);
- }
- /**
- * get the int prefix of path prefix
- * @return
- */
- private String getNum() {
- StringBuffer sb = new StringBuffer();
- if (num < 10) {
- sb.append("000").append(num);
- } else
- if (num < 100) {
- sb.append("00").append(num);
- } else
- if (num < 1000) {
- sb.append("0").append(num);
- } else {
- sb.append(num);
- }
- return sb.toString();
- }
- /**
- * set the int prefix of path prefix
- * and the max num is 9999 and the min is 0000
- */
- private void setNum() {
- if (num < 9999) {
- num++;
- } else {
- num = 0;
- setPrefix();
- }
- }
- }
第二部分,服务器端代码
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.util.Iterator;
- public class Server {
- Selector selector = null;
- ServerSocketChannel serverSocketChannel = null;
- private NioserverHandler2 handler;
- public Server() throws IOException {
- selector = Selector.open();
- // 打开服务器套接字通道
- serverSocketChannel = ServerSocketChannel.open();
- // 调整通道的阻塞模式非阻塞
- serverSocketChannel.configureBlocking(false);
- //serverSocketChannel.socket().setReuseAddress(true);
- serverSocketChannel.socket().bind(new InetSocketAddress(9999));
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- }
- public Server(NioserverHandler2 handler) throws IOException {
- this();
- this.handler = handler;
- while (selector.select() > 0) {
- Iterator < SelectionKey > it = selector.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey s = it.next();
- it.remove();
- this.handler.excute((ServerSocketChannel) s.channel());
- }
- }
- }
- public static void main(String[] args) throws IOException {
- new Server(new NioserverHandler2());
- }
- }
- public class NioserverHandler2 {
- private final static String DIRECTORY = "G:\\NioRequest\\";
- /**
- * 这里边我们处理接收和发送
- *
- * @param serverSocketChannel
- */
- public void excute(ServerSocketChannel serverSocketChannel) {
- SocketChannel socketChannel = null;
- try {
- socketChannel = serverSocketChannel.accept(); // 等待客户端连接
- RequestObject2 requestObject = receiveData(socketChannel); // 接数据
- // logger.log(Level.INFO,requestObject.toString());
- String md5 = DigestUtils.md5Hex(requestObject.getContents());
- String response = "";
- if (md5.equals(requestObject.getMd5())) {
- response = (new ResponseObject("succeed", requestObject.getAbsolutePath(), "")).toString();
- File file = new File(DIRECTORY + requestObject.getRelativePath());
- if (!file.exists()) {
- file.mkdirs();
- }
- File file1 = new File(DIRECTORY + requestObject.getRelativePath() + requestObject.getFilename());
- if (!file1.exists()) {
- file1.createNewFile();
- }
- FileOutputStream fos = new FileOutputStream(file1);
- fos.write(requestObject.getContents());
- fos.close();
- }
- else {
- response = (new ResponseObject("failed", requestObject.getAbsolutePath(), "md5验证失败")).toString();
- }
- System.out.println(response);
- responseData(socketChannel, response);
- // logger.log(Level.INFO, response);
- }
- catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * <p>
- * 读取通道中的数据到Object里去
- * </p>
- *
- * @param socketChannel
- * @return
- * @throws IOException
- */
- public RequestObject2 receiveData(SocketChannel socketChannel) throws IOException {
- // 文件名
- String fileName = null;
- String relativePath = null;
- String absolutePath = null;
- String md5 = null;
- // 文件长度
- int contentLength = 0;
- // 文件内容
- byte[] contents = null;
- // 由于我们解析时前4个字节是文件名长度
- int capacity = 4;
- ByteBuffer buf = ByteBuffer.allocate(capacity);
- int size = 0;
- byte[] bytes = null;
- // 拿到文件名的长度
- size = socketChannel.read(buf);
- if (size >= 0) {
- buf.flip();
- capacity = buf.getInt();
- buf.clear();
- }
- // 拿文件名,相信文件名一次能够读完,如果你文件名超过1K 你有病了
- buf = ByteBuffer.allocate(capacity);
- size = socketChannel.read(buf);
- if (size >= 0) {
- buf.flip();
- bytes = new byte[size];
- buf.get(bytes);
- buf.clear();
- }
- String fileInfo = new String(bytes);
- System.out.println(fileInfo);
- fileName = fileInfo.split(";")[0];
- relativePath = fileInfo.split(";")[1];
- absolutePath = fileInfo.split(";")[2];
- md5 = fileInfo.split(";")[3];
- // 拿到文件长度
- capacity = 4;
- buf = ByteBuffer.allocate(capacity);
- size = socketChannel.read(buf);
- if (size >= 0) {
- buf.flip();
- // 文件长度是可要可不要的,如果你要做校验可以留下
- capacity = buf.getInt();
- buf.clear();
- }
- if (capacity == 0) {
- contents = new byte[] {};
- }
- else {
- // 用于接收buffer中的字节数组
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // 文件可能会很大
- // capacity = 1024;
- buf = ByteBuffer.allocate(capacity);
- while ((size = socketChannel.read(buf)) >= 0) {
- buf.flip();
- bytes = new byte[size];
- buf.get(bytes);
- baos.write(bytes);
- buf.clear();
- }
- contents = baos.toByteArray();
- }
- RequestObject2 requestObject = new RequestObject2(fileName, relativePath, absolutePath, md5, contents);
- return requestObject;
- }
- private void responseData(SocketChannel socketChannel, String response) {
- ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
- try {
- socketChannel.write(buffer);
- buffer.clear();
- // 确认要发送的东西发送完了关闭output 不然它端接收时socketChannel.read(Buffer)
- // 很可能造成阻塞 ,可以把这个(L)注释掉,会发现客户端一直等待接收数据
- socketChannel.socket().shutdownOutput(); // (L)
- }
- catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- import java.io.Serializable;
- public class RequestObject2 implements Serializable {
- private static final long serialVersionUID = 1L;
- private String filename;
- private String relativePath;
- private String absolutePath;
- private String md5;
- private byte[] contents;
- public RequestObject2(String filename, String relativePath, String absolutePath, String md5, byte[] contents) {
- this.filename = filename;
- this.relativePath = relativePath;
- this.absolutePath = absolutePath;
- this.md5 = md5;
- this.contents = contents;
- }
- public String getFilename() {
- return filename;
- }
- public String getRelativePath() {
- return relativePath;
- }
- public String getAbsolutePath() {
- return absolutePath;
- }
- public String getMd5() {
- return md5;
- }
- public byte[] getContents() {
- return contents;
- }
第三部分 客户端代码
- import java.io.BufferedReader;
- import java.io.File;
- import java.io.FileReader;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- public class Client2 {
- private static final String unpath = "G:\\temp\\unUploadedFilePath.txt";
- private static final String pathPre = "D:\\upload\\";
- private static final String IPADDR = "127.0.0.1";
- private static final int PORT = 9999;
- Selector selector;
- public Client2() throws IOException {
- selector = Selector.open();
- new Thread(new SendDataRunnable()).start();
- }
- private class SendDataRunnable implements Runnable {
- private ClientHandler handler;
- public SendDataRunnable() {
- handler = new ClientHandler();
- }
- @Override
- public void run() {
- try {
- BufferedReader reader = new BufferedReader(new FileReader(new File(unpath)));
- String path = "";
- while ((path = reader.readLine()) != null && path.length() != 0) {
- SocketChannel socketChannel;
- socketChannel = SocketChannel.open();
- socketChannel.connect(new InetSocketAddress(IPADDR, PORT));
- socketChannel.configureBlocking(false);
- socketChannel.register(selector, SelectionKey.OP_READ);
- handler.sendData(socketChannel, path, pathPre);
- String response = handler.receiveData(socketChannel);
- System.out.println(response);
- socketChannel.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public static void main(String[] args) throws IOException {
- Client2 client = new Client2();
- }
- }
- import java.io.ByteArrayOutputStream;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.SocketChannel;
- import org.apache.commons.codec.digest.DigestUtils;
- public class ClientHandler {
- public void sendData(SocketChannel socketChannel,String path,String pathPre)throws Exception{
- System.out.println(path);
- String absoluteFilePath=getAbsoluteFilePath(path);
- String fileName=getFileName(absoluteFilePath);
- String relativeFilePath=getRelativeFilePath(absoluteFilePath, pathPre,fileName);
- System.out.println(absoluteFilePath);
- byte[] bytes=makeFileToBytes(absoluteFilePath);
- System.out.println(bytes.length);
- String md5=DigestUtils.md5Hex(bytes);
- String fileInfo=new StringBuffer()
- .append(fileName)
- .append(";")
- .append(relativeFilePath)
- .append(";")
- .append(path)
- .append(";")
- .append(md5)
- .toString();
- System.out.println(fileInfo);
- ByteBuffer buffer = ByteBuffer.allocate(8 +fileInfo.getBytes().length+bytes.length);
- buffer.putInt(fileInfo.getBytes().length);
- buffer.put(fileInfo.getBytes());
- buffer.putInt(bytes.length);
- buffer.put(ByteBuffer.wrap(bytes));
- buffer.flip();
- socketChannel.write(buffer);
- buffer.clear();
- // 关闭输出流防止接受时阻塞,就是告诉接收方本次的内容已经发完了,你不用等了
- socketChannel.socket().shutdownOutput();
- }
- private String getAbsoluteFilePath(String path){
- return path.substring(8);
- }
- private String getRelativeFilePath(String absoluteFilePath,String pathPre,String fileName){
- return absoluteFilePath.substring(pathPre.length(),absoluteFilePath.length()-fileName.length());
- }
- private String getFileName(String path){
- return new File(path).getName();
- }
- private byte[] makeFileToBytes(String filePath){
- File file=new File(filePath);
- byte[] ret = null;
- try {
- FileInputStream in = new FileInputStream(file);
- ByteArrayOutputStream out = new ByteArrayOutputStream(4096);
- byte[] b = new byte[4096];
- int n;
- while ((n = in.read(b)) != -1) {
- out.write(b, 0, n);
- }
- in.close();
- out.close();
- ret = out.toByteArray();
- } catch (IOException e) {
- // log.error("helper:get bytes from file process error!");
- e.printStackTrace();
- }
- return ret;
- }
- public String receiveData(SocketChannel socketChannel) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- String response = "";
- try {
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- byte[] bytes;
- int count = 0;
- while ((count = socketChannel.read(buffer)) >= 0) {
- buffer.flip();
- bytes = new byte[count];
- buffer.get(bytes);
- baos.write(bytes);
- buffer.clear();
- }
- bytes = baos.toByteArray();
- response = new String(bytes, "UTF-8");
- // socketChannel.socket().shutdownInput();
- } finally {
- try {
- baos.close();
- } catch (Exception ex) {
- }
- }
- return response;
- }
- }
/* 至此全部完成,注释不够多,部分代码是从网上找的。后期有时间会补全注释的,或者下次直接上最终使用的代码 */
来源: http://www.bubuko.com/infodetail-2156172.html