本文属于 Android 局域网内的语音对讲项目系列,《实时 Android 语音对讲系统架构》阐述了局域网内 Android 语音对讲功能的框架,本文在此基础上进行了优化,包括音频的录制、播放,通信方式,以及整体架构的改进。
本文主要包括以下内容:
在《实时 Android 语音对讲系统架构》对语音对讲系统的数据链路的分析中提到,数据包要经过 Record、Encoder、Transmission、Decoder、Play 这一链条的处理,这种数据流转就是对讲机核心抽象,鉴于这种场景,采用了责任链设计模式。
在后续实践中发现这样的结构存在一些问题,责任链模式适用于数据即时流转,需要整个链路没有阻塞、等待。而在本应用场景中,编解码及录制播放均可能存在时间延迟,责任链模式无法兼顾网络、编解码的延时。
事实上,通过缓存队列则可以保证数据链路的稳定性,分别在编解码和数据发送接收时加入阻塞队列,可以实现数据包的缓冲,同时降低丢包的可能。因此,在本系统场景下,基于阻塞队列实现了生产者 - 消费者模式,是对责任链模式的优化,意在提高数据链路的鲁棒性。
本节包括以下内容:
阻塞队列(数据结构)
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法 \ 处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 | |
---|---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) | |
移除方法 | remove() | poll() | take() | poll(time,unit) | |
检查方法 | element() | peek() | 不可用 | 不可用 |
异常。当队列为空时,从队列里获取元素时会抛出
- IllegalStateException("Queue full")
异常 。
- NoSuchElementException
本文通过
的
- LinkedBlockingQueue
和
- put
方法实现线程阻塞。
- take
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为
- LinkedBlockingQueue
。此队列按照先进先出的原则对元素进行排序。
- Integer.MAX_VALUE
首先看下
中核心的域:
- LinkedBlockingQueue
- static class Node < E > {
- E item;
- Node < E > next;
- Node(E x) {
- item = x;
- }
- }
- private final int capacity;
- private final AtomicInteger count = new AtomicInteger();
- transient Node < E > head;
- private transient Node < E > last;
- private final ReentrantLock takeLock = new ReentrantLock();
- private final Condition notEmpty = takeLock.newCondition();
- private final ReentrantLock putLock = new ReentrantLock();
- private final Condition notFull = putLock.newCondition();
和
- LinkedBlockingQueue
类似,通过静态内部类
- LinkedList
进行元素的存储;
- Node<E>
表示阻塞队列所能存储的最大容量,在创建时可以手动指定最大容量,默认的最大容量为
- capacity
;
- Integer.MAX_VALUE
表示当前队列中的元素数量,
- count
的入队列和出队列使用了两个不同的 lock 对象,因此无论是在入队列还是出队列,都会涉及对元素数量的并发修改,因此这里使用了一个原子操作类来解决对同一个变量进行并发修改的线程安全问题。
- LinkedBlockingQueue
和
- head
分别表示链表的头部和尾部;
- last
表示元素出队列时线程所获取的锁,当执行
- takeLock
、
- take
等操作时线程获取;
- poll
当队列为空时,通过该
- notEmpty
让获取元素的线程处于等待状态;
- Condition
表示元素入队列时线程所获取的锁,当执行
- putLock
、
- put
等操作时获取;
- offer
当队列容量达到
- notFull
时,通过该
- capacity
让加入元素的线程处于等待状态。
- Condition
其次,
有三个构造方法,分别如下:
- LinkedBlockingQueue
- public LinkedBlockingQueue() {
- this(Integer.MAX_VALUE);
- }
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- last = head = new Node<E>(null);
- }
- public LinkedBlockingQueue(Collection<? extends E> c) {
- this(Integer.MAX_VALUE);
- final ReentrantLock putLock = this.putLock;
- putLock.lock(); // Never contended, but necessary for visibility
- try {
- int n = 0;
- for (E e : c) {
- if (e == null)
- throw new NullPointerException();
- if (n == capacity)
- throw new IllegalStateException("Queue full");
- enqueue(new Node<E>(e));
- ++n;
- }
- count.set(n);
- } finally {
- putLock.unlock();
- }
- }
默认构造函数直接调用
,
- LinkedBlockingQueue(int capacity)
会初始化首尾节点,并置位 null。
- LinkedBlockingQueue(int capacity)
在初始化队列的同时,将一个集合的全部元素加入队列。
- LinkedBlockingQueue(Collection<? extends E> c)
最后,重点分析下
和
- put
的过程:
- take
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- while (count.get() == capacity) {
- notFull.await();
- }
- enqueue(node);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- }
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- while (count.get() == 0) {
- notEmpty.await();
- }
- x = dequeue();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
之所以把
和
- put
放在一起,是因为它们是一对互逆的过程:
- take
在插入元素前首先获得
- put
和当前队列的元素数量,
- putLock
在去除元素钱首先获得
- take
和当前队列的元素数量;
- takeLock
时需要判断当前队列是否已满,已满时当前线程进行等待,
- put
时需要判断队列是否已空,队列为空时当前线程进行等待;
- take
调用
- put
在队尾插入元素,并修改尾指针,
- enqueue
调用
- take
将
- dequeue
指向原来
- head
的位置,并将 first 的数据域置位 null,实现删除原
- first
指针,并产生新的
- first
,同时,切断原
- head
节点的引用,便于垃圾回收。
- head
- private void enqueue(Node<E> node) {
- last = last.next = node;
- }
- private E dequeue() {
- Node<E> h = head;
- Node<E> first = h.next;
- h.next = h; // help GC
- head = first;
- E x = first.item;
- first.item = null;
- return x;
- }
根据
- put
决定是否触发队列未满和队列空;
- count
根据
- take
决定是否触发队列未空和队列满。
- count
在入队列和出队列时使用的是不同的 Lock,这也意味着它们之间的操作不会存在互斥。在多个 CPU 的情况下,可以做到在同一时刻既消费、又生产,做到并行处理。
- LinkedBlockingQueue
阻塞队列实现生产者 - 消费者模式
通过对
主要源码的分析,实现生产者 - 消费者模式就变得简单了。
- LinkedBlockingQueue
- public class MessageQueue {
- private static MessageQueue messageQueue1,
- messageQueue2,
- messageQueue3,
- messageQueue4;
- private BlockingQueue < AudioData > audioDataQueue = null;
- private MessageQueue() {
- audioDataQueue = new LinkedBlockingQueue < >();
- }@Retention(SOURCE)@IntDef({
- ENCODER_DATA_QUEUE,
- SENDER_DATA_QUEUE,
- DECODER_DATA_QUEUE,
- TRACKER_DATA_QUEUE
- }) public@interface DataQueueType {}
- public static final int ENCODER_DATA_QUEUE = 0;
- public static final int SENDER_DATA_QUEUE = 1;
- public static final int DECODER_DATA_QUEUE = 2;
- public static final int TRACKER_DATA_QUEUE = 3;
- public static MessageQueue getInstance(@DataQueueType int type) {
- switch (type) {
- case ENCODER_DATA_QUEUE:
- if (messageQueue1 == null) {
- messageQueue1 = new MessageQueue();
- }
- return messageQueue1;
- case SENDER_DATA_QUEUE:
- if (messageQueue2 == null) {
- messageQueue2 = new MessageQueue();
- }
- return messageQueue2;
- case DECODER_DATA_QUEUE:
- if (messageQueue3 == null) {
- messageQueue3 = new MessageQueue();
- }
- return messageQueue3;
- case TRACKER_DATA_QUEUE:
- if (messageQueue4 == null) {
- messageQueue4 = new MessageQueue();
- }
- return messageQueue4;
- default:
- return new MessageQueue();
- }
- }
- public void put(AudioData audioData) {
- try {
- audioDataQueue.put(audioData);
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- public AudioData take() {
- try {
- return audioDataQueue.take();
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- return null;
- }
- }
这里通过
来实现限定输入类型的功能,同时,阻塞队列保持单实例,然后将队列分别应用到各个生产者 - 消费者线程中。在本文的语音对讲系统中,以音频录制线程和编码线程为例,录制线程是音频数据包的生产者,编码线程是音频数据包的消费者。
- @IntDef
音频录制线程:
- @Override
- public void run() {
- while (isRecording) {
- if (audioRecord.getRecordingState() == AudioRecord.RECORDSTATE_STOPPED) {
- audioRecord.startRecording();
- }
- // 实例化音频数据缓冲
- short[] rawData = new short[inAudioBufferSize];
- audioRecord.read(rawData, 0, inAudioBufferSize);
- AudioData audioData = new AudioData(rawData);
- MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).put(audioData);
- }
- }
编码线程:
- @Override
- public void run() {
- AudioData data;
- // 在MessageQueue为空时,take方法阻塞
- while ((data = MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).take()) != null) {
- data.setEncodedData(AudioDataUtil.raw2spx(data.getRawData()));
- MessageQueue.getInstance(MessageQueue.SENDER_DATA_QUEUE).put(data);
- }
- }
同样的,编码线程和发送线程,接收线程和解码线程,解码线程和播放线程同样存在生产者 - 消费者的关系。
)获取改为
- MIC
,
- MediaRecorder.AudioSource.VOICE_COMMUNICATION
能自动回声消除和增益,因此,屏蔽了 speex 在 C 层的降噪和增益。
- VOICE_COMMUNICATION
换成
- STREAM_MUSIC
,因为,对讲机应用更类似于语音通信。换成
- STREAM_VOICE_CALL
之后,遇到的问题是只能从听筒听到声音,于是设置免提功能。
- STREAM_VOICE_CALL
该设置必须要开放修改音频的权限,不然没有效果。
- AudioManager audioManager =(AudioManager) getSystemService(Context.AUDIO_SERVICE);
- audioManager.setMode(AudioManager.MODE_IN_COMMUNICATION);
- audioManager.setSpeakerphoneOn(true);
- <uses-permission android:name="android.permission.MODIFY_AUDIO_SETTINGS"
- />
目前的语音通信质量,个人感觉仍然需要继续优化,如果您有这方面的经验(包括但不限于 Java 层和 Speex 音频处理),不吝赐教!
《通过 UDP 广播实现 Android 局域网 Peer Discovering》中从编程的角度说明了 TCP 与 UDP 的区别,主要分析了 TCP 是面向连接的、可靠的服务,建立连接需要经过三次握手、销毁连接需要四次挥手;UDP 是无连接的传输层协议,提供面向事务的简单不可靠信息传送服务。
IP 地址分为三类:单播、广播和多播。广播和多播仅用于 UDP,它们用于将报文同时传送给多个接收者。广播分为:受限广播、指向网络的广播、指向子网的广播、指向所有子网的广播。
举个栗子:当前 IP 为 10.13.200.16/22,首先广播地址为 255.255.255.255,子网广播地址为 10.13.203.255。
《通过 UDP 广播实现 Android 局域网 Peer Discovering》采用子网广播实现局域网 Android 设备的发现,但在实践中,一般路由器会禁止所有广播跨路由器传输。所以,如果子网内有多个路由器,那么就无法实现设备发现了。因此,本文将设备发现也改为多播实现。多播组地址包括为 1110 的最高 4bit 和多播组号,范围为 224.0.0.0 到 239.255.255.255。能够接收发往一个特定多播组地址数据的主机集合称为主机组,主机组可以跨越多个网络。
IANA 把 224.0.0.0 到 224.0.0.255 范围内的地址全部都保留给了路由协议和其他网络维护功能。该范围内的地址属于局部范畴,不论生存时间字段(TTL)值是多少,都不会被路由器转发;D 类保留地址的完整的列表可以参见 RFC1700。 224.0.1.0 到 238.255.255.255 地址范围作为用户组播地址,在全网范围内有效。其中 233/8 为 GLOP 地址。GLOP 是一种自治系统之间的组播地址分配机制,将 AS 号直接填入组播地址的中间两个字节中,每个自治系统都可以得到 255 个组播地址; 239.0.0.0 到 239.255.255.255 地址范围为本地管理组播地址(administratively scoped addresses),仅在特定的本地范围内有效。
本文对比了子网广播和多播,子网广播地址为:192.168.137.255,多播组地址为:224.5.6.7。
发送接收采用同一 MulticastSocket,
设置
- MulticastSocket
,
- TTL
表示跨网络的级数。
- TTL
- try {
- inetAddress = InetAddress.getByName(Constants.MULTI_BROADCAST_IP);
- multicastSocket = new MulticastSocket(Constants.MULTI_BROADCAST_PORT);
- multicastSocket.setLoopbackMode(true);
- multicastSocket.joinGroup(inetAddress);
- multicastSocket.setTimeToLive(4);
- } catch (IOException e) {
- e.printStackTrace();
- }
涉及到另一个协议:网路群组管理协议(Internet Group Management Protocol 或简写 IGMP),通过抓包可以观察到初始化
- joinGroup
时加入组协议的报文。
- MulticastSocket
用于设置生存时间字段。默认情况下,多播数据报的 TTL 设置为 1,使得多播数据报仅限于在同一个子网内传送,更大的 TTL 值能够被多播路由器转发。在实际传输过程中,多播组地址仍然需要转换为以太网地址。实际转换规则这里不再赘述。
- setTimeToLive
上述多播地址 224.5.6.7 转换后为 01:00:5e:05:06:07。
代码层面上,探测线程将子网广播改为多播实现。
- if (command != null) {
- byte[] data = command.getBytes();
- DatagramPacket datagramPacket = new DatagramPacket(
- data, data.length, Multicast.getMulticast().getInetAddress(), Constants.MULTI_BROADCAST_PORT);
- try {
- Multicast.getMulticast().getMulticastSocket().send(datagramPacket);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
并且在接收端区分指令和音频数据。
- while (true) {
- // 设置接收缓冲段
- byte[] receivedData = new byte[512];
- DatagramPacket datagramPacket = new DatagramPacket(receivedData, receivedData.length);
- try {
- // 接收数据报文
- Multicast.getMulticast().getMulticastSocket().receive(datagramPacket);
- } catch (IOException e) {
- e.printStackTrace();
- }
- // 判断数据报文类型,并做相应处理
- if (datagramPacket.getLength() == Command.DISC_REQUEST.getBytes().length ||
- datagramPacket.getLength() == Command.DISC_LEAVE.getBytes().length ||
- datagramPacket.getLength() == Command.DISC_RESPONSE.getBytes().length) {
- handleCommandData(datagramPacket);
- } else {
- handleAudioData(datagramPacket);
- }
- }
在实际工程应用场景中,需要对讲机进程即使切换到后台,也依然能收到信息。因此,为了提高进程的优先级,降低被系统回收的概率,采用了在 Service 中访问网络服务,处理语音信息的发送和接收的方案。前台 Activity 负责显示组播组内用户(上线和下线,更新页面),通过 AIDL 与 Service 进行跨进程通信和回调。Service 的清单说明如下:
- <service
- android:name=".service.IntercomService"
- android:process=":intercom" />
表示定义子进程 intercom。
- :intercom
使用多进程相比于常见的单进程,有一些需要注意的点:
因此,通过
定义了多进程之后,一定要避免单进程模式下对象共享的思路。另外,在 AS 中调试多进程应用的时候,断点一定要针对不同的进程,以本文为例,添加断点需要选择主进程和 intercom 进程。给两个进程分别添加调试断点后,可以看到有两个 Debugger:3156 和 3230(由于存在 Jni 代码,所以显示了 Hybrid Debugger)。
- process
由于既存在 Activity 到 Service 的通信,也存在 Service 接收到消息之后更新 Activity 页面的需求,所以这里采用了跨进程回调的方式。首先,AIDL 方法如下:
- package com.jd.wly.intercom.service;
- import com.jd.wly.intercom.service.IIntercomCallback;
- interface IIntercomService {
- void startRecord();
- void stopRecord();
- void registerCallback(IIntercomCallback callback);
- void unRegisterCallback(IIntercomCallback callback);
- }
- package com.jd.wly.intercom.service;
- interface IIntercomCallback {
- void findNewUser(String ipAddress);
- void removeUser(String ipAddress);
- }
定义了 Activity 到 Service 的通信方法,包含启动和停止音频录制,以及注册和解除回调接口;
- IIntercomService
定义了从 Service 到 Activity 的回调接口,用于在 Service 发现用户上线、下线时通知前台 Activity 的显示。
- IIntercomCallback
AIDL 文件的定义涉及一些规范:比如变量在同一包内也需要 import,非基本数据类型参数列表需要指明 in、out,自定义参数类型需要同时编写 java 文件和 aidl 文件等,本文篇幅有限,就不具体展开 AIDL 跨进程通信的细节了。
Activity 检测用户的按键操作,然后将事件传递给 Service 进行对应的逻辑处理。 将 Service 绑定到 Activity 首先需要定义
:
- ServiceConnection
- /**
- * onServiceConnected和onServiceDisconnected运行在UI线程中
- */
- private IIntercomService intercomService;
- private ServiceConnection serviceConnection = new ServiceConnection() {
- @Override
- public void onServiceConnected(ComponentName name, IBinder service) {
- intercomService = IIntercomService.Stub.asInterface(service);
- try {
- intercomService.registerCallback(intercomCallback);
- } catch (RemoteException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void onServiceDisconnected(ComponentName name) {
- intercomService = null;
- }
- };
在
时绑定 Service,
- onStart()
时解除回调和绑定。
- onStop()
- @Override
- protected void onStart() {
- super.onStart();
- Intent intent = new Intent(AudioActivity.this, IntercomService.class);
- bindService(intent, serviceConnection, BIND_AUTO_CREATE);
- }
- @Override
- protected void onStop() {
- super.onStop();
- if (intercomService != null && intercomService.asBinder().isBinderAlive()) {
- try {
- intercomService.unRegisterCallback(intercomCallback);
- } catch (RemoteException e) {
- e.printStackTrace();
- }
- unbindService(serviceConnection);
- }
- }
Activity 获取了 Service 的服务后,分别在按键事件处理中进行调用。
- @Override
- public boolean onKeyDown(int keyCode, KeyEvent event) {
- if ((keyCode == KeyEvent.KEYCODE_F2 ||
- keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) {
- try {
- intercomService.startRecord();
- } catch (RemoteException e) {
- e.printStackTrace();
- }
- return true;
- }
- return super.onKeyDown(keyCode, event);
- }
- @Override
- public boolean onKeyUp(int keyCode, KeyEvent event) {
- if ((keyCode == KeyEvent.KEYCODE_F2 ||
- keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) {
- try {
- intercomService.stopRecord();
- } catch (RemoteException e) {
- e.printStackTrace();
- }
- return true;
- }
- return super.onKeyUp(keyCode, event);
- }
和
- startRecord
的具体实现定义在 Service 中:
- stopRecord
- public IIntercomService.Stub mBinder = new IIntercomService.Stub() {
- @Override
- public void startRecord() throws RemoteException {
- if (!recorder.isRecording()) {
- recorder.setRecording(true);
- tracker.setPlaying(false);
- threadPool.execute(recorder);
- }
- }
- @Override
- public void stopRecord() throws RemoteException {
- if (recorder.isRecording()) {
- recorder.setRecording(false);
- tracker.setPlaying(true);
- }
- }
- @Override
- public void registerCallback(IIntercomCallback callback) throws RemoteException {
- mCallbackList.register(callback);
- }
- @Override
- public void unRegisterCallback(IIntercomCallback callback) throws RemoteException {
- mCallbackList.unregister(callback);
- }
- };
Service 通过
保持回调方法,使用时首先定义
- RemoteCallbackList
对象,泛型类型为
- RemoteCallbackList
。
- IIntercomCallback
- private RemoteCallbackList<IIntercomCallback> mCallbackList = new RemoteCallbackList<>();
并不是
- RemoteCallbackList
,内部通过 Map 来保存,Key 和 Value 分别为
- List
和
- IBinder
。
- Callback
- ArrayMap < IBinder,
- Callback > mCallbacks = new ArrayMap < IBinder,
- Callback > ();
使用
回调 Activity 方法时,通过
- RemoteCallbackList
获取数量,
- beginBroadcast
- /**
- * 发现新的组播成员
- *
- * @param ipAddress IP地址
- */
- private void findNewUser(String ipAddress) {
- final int size = mCallbackList.beginBroadcast();
- for (int i = 0; i < size; i++) {
- IIntercomCallback callback = mCallbackList.getBroadcastItem(i);
- if (callback != null) {
- try {
- callback.findNewUser(ipAddress);
- } catch (RemoteException e) {
- e.printStackTrace();
- }
- }
- }
- mCallbackList.finishBroadcast();
- }
方法与
- removeUser(String ipAddress)
方法类似。它们具体的实现在 Activity 中:
- findNewUser(String ipAddress)
- /**
- * 被调用的方法运行在Binder线程池中,不能更新UI
- */
- private IIntercomCallback intercomCallback = new IIntercomCallback.Stub() {
- @Override
- public void findNewUser(String ipAddress) throws RemoteException {
- sendMsg2MainThread(ipAddress, FOUND_NEW_USER);
- }
- @Override
- public void removeUser(String ipAddress) throws RemoteException {
- sendMsg2MainThread(ipAddress, REMOVE_USER);
- }
- };
需要注意的是,
中的回调方法实现并不在 UI 线程中执行,如果需要更新 UI,需要实现多线程调用,多线程依然通过 Handler 来实现,这里不再赘述,如果需要,请参考:《Android 线程管理(一)——线程通信》。
- IIntercomCallback
来源: http://www.cnblogs.com/younghao/p/6907171.html