`
javatoyou
  • 浏览: 1017124 次
  • 性别: Icon_minigender_2
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Asterisk 1.8 队列 分析

 
阅读更多

上一篇文章 讨论了 invite请求最终 走到 dialplan,走到拨号方案后 具体做什么动作由用户决定,对于呼叫中心应用,队列是必不可少的功能,本篇文档分析一下主叫打进系统,走到dialplan后 进队列,呼叫坐席。。

一切从队列开始。。。。

queue(nama,timout,tT,,.),

queue_exec为队列入口函数。首先解析 队列参数。

AST_DECLARE_APP_ARGS(args,
AST_APP_ARG(queuename);
AST_APP_ARG(options);
AST_APP_ARG(url);
AST_APP_ARG(announceoverride);
AST_APP_ARG(queuetimeoutstr);
AST_APP_ARG(agi);
AST_APP_ARG(macro);
AST_APP_ARG(gosub);
AST_APP_ARG(rule);
AST_APP_ARG(position);

这里 1.8版本增加了参数 position

此参数指定 主叫进入队列后在所有主叫中的排队位置,为1,则此客户优先级最高,优先接听,,这是个参数在队列内有很多等待电话时有用。

此参数跟 QUEUE_PRIO 类似,但position更灵活。。。

队列参数解析完毕 调用 join_queue 进入队列。。。同时指定进入队列失败原因。。。设置 QUEUESTATUS.

join_queue内部。首先调用 load_realtime_queue,加载队列坐席信息。。。

load_realtime_queue 内部,首先根据 队列名字 在内存中查找 队列是否存在,有,则返回在内存中的位置,否则为NULL

/* Find the queue in the in-core list first. */
q = ao2_t_find(queues, &tmpq, OBJ_POINTER, "Look for queue in memory first");

这里ao2_t_find为hash查找,速度应该比链表快多了,这是1.6,1.8版本提供的。

if (!q || q->realtime)

如果没找到 ,或者找到了&&队列是realtime(数据库保存队列)的,则,调用ast_load_realtime,从数据库中加载队列信息。

所以,对于realtime队列,不管找不找到,都会执行数据库加载动作。

queue_vars = ast_load_realtime("queues", "name", queuename, SENTINEL);
if (queue_vars) {
member_config = ast_load_realtime_multientry("queue_members", "interface LIKE", "%", "queue_name", queuename, SENTINEL);
if (!member_config) {
ast_log(LOG_ERROR, "no queue_members defined in your config (extconfig.conf)./n");
ast_variables_destroy(queue_vars);
return NULL;
}
}
if (q) {
prev_weight = q->weight ? 1 : 0;
}

首先根据队列名字加载队列,如果数据库中存在此队列,则加载此队列的坐席(queue_member表),没有坐席会则函数退出。这里做这两个步骤的目的是做一些溢出处理,如数据库没有队列,或队列无坐席等。

接下来 ,如果内存中存在此队列,则设置队列优先级。

然后调用 find_queue_by_name_rt 函数,此函数很重要,他的功能为 做数据库信息与内存的同步。。。返回全新的队列 到q

q = find_queue_by_name_rt(queuename, queue_vars, member_config);

*****

queue_vars 保存的是 数据库队列信息,member_config 保存的是 数据库内此队列的坐席信息。

下面分析一下find_queue_by_name_rt

首先处理 不是 realtime的队列。接下来处理realtime队列。

/* Check if queue is defined in realtime. */
if (!queue_vars) {

1。根据数据库中是否存在队列判断是否为realtime队列,,这里,如果数据库中队列已经没了,但内存中该队列还存在(不是queue.conf静态配置的队列),,此时,设置队列死亡。q->dead = 1;。实际就是在内存中删除,

2。数据库中存在,内存中不存在。创建。。。

alloc_queue 分配队列内存。。。同时指定析构函数。

clear_queue,清理队列,初始化队列,首先设置队列策略,找不到就设置默认ringall,这里这么早设置队列策略的原因为lineer策略依赖于

此设置,因为下面为初始化坐席到内存中时要用到策略。接下来把此队列放到全局队列容器中queues_t_link(queues, q, "Add queue to container");

接下来调用 init_queue 初始化队列默认值。这里,如果内存中已经存在,则也重置队列参数。原因是在与数据库同步。

包括 队列参数queue.conf 中 autopause,timeoutpriority 等的初始化。。坐席的分配。。。

q->dead = 0;
q->retry = DEFAULT_RETRY;
q->timeout = DEFAULT_TIMEOUT;
q->maxlen = 0;
q->announcefrequency = 0;
q->minannouncefrequency = DEFAULT_MIN_ANNOUNCE_FREQUENCY;
q->announceholdtime = 1;
q->announcepositionlimit = 10; /* Default 10 positions */
q->announceposition = ANNOUNCEPOSITION_YES; /* Default yes */
q->roundingseconds = 0; /* Default - don't announce seconds */
q->servicelevel = 0;
q->ringinuse = 1;
q->setinterfacevar = 0;
q->setqueuevar = 0;
q->setqueueentryvar = 0;
q->autofill = autofill_default;
q->montype = montype_default;
q->monfmt[0] = '/0';
q->reportholdtime = 0;
q->wrapuptime = 0;
q->penaltymemberslimit = 0;
q->joinempty = 0;
q->leavewhenempty = 0;
q->memberdelay = 0;
q->maskmemberstatus = 0;
q->eventwhencalled = 0;
q->weight = 0;
q->timeoutrestart = 0;
q->periodicannouncefrequency = 0;
q->randomperiodicannounce = 0;
q->numperiodicannounce = 0;
q->autopause = QUEUE_AUTOPAUSE_OFF;
q->timeoutpriority = TIMEOUT_PRIORITY_APP;
if (!q->members) {
if (q->strategy == QUEUE_STRATEGY_LINEAR)
/* linear strategy depends on order, so we have to place all members in a single bucket */
q->members = ao2_container_alloc(1, member_hash_fn, member_cmp_fn);
else
q->members = ao2_container_alloc(37, member_hash_fn, member_cmp_fn);
}

队列参数默认值初始化完毕,然后 调用 queue_set_param 用数据库内的队列参数值填充内存队列的参数。

if (!strcasecmp(param, "musicclass") ||
!strcasecmp(param, "music") || !strcasecmp(param, "musiconhold")) {
ast_string_field_set(q, moh, val);
} else if (!strcasecmp(param, "announce")) {
ast_string_field_set(q, announce, val);
} else if (!strcasecmp(param, "context")) {
ast_string_field_set(q, context, val);
} else if (!strcasecmp(param, "timeout")) {
q->timeout = atoi(val);
if (q->timeout < 0)
q->timeout = DEFAULT_TIMEOUT;
} else if (!strcasecmp(param, "ringinuse")) {
q->ringinuse = ast_true(val);。。

。。。。。。。。。。。。。

此函数设置的参数为单个队列内的所有参数。。至此,数据库队列配置已经同步到内存。

下面做 坐席 的同步。

while ((m = ao2_iterator_next(&mem_iter))) {
q->membercount++;
if (m->realtime)
m->dead = 1;
ao2_ref(m, -1);
}

把内存中坐席信息放到 容器中,,标志坐席为dead,原因是如果坐席在数据库中已不存在,则在内存中删除掉。。

然后 执行 while ((interface = ast_category_browse(member_config, interface))) {

根据 坐席的interface 在数据库中按行查找,

while ((interface = ast_category_browse(member_config, interface))) {
rt_handle_member_record(q, interface,
ast_variable_retrieve(member_config, interface, "uniqueid"),
S_OR(ast_variable_retrieve(member_config, interface, "membername"),interface),
ast_variable_retrieve(member_config, interface, "penalty"),
ast_variable_retrieve(member_config, interface, "paused"),
S_OR(ast_variable_retrieve(member_config, interface, "state_interface"),interface));
}

rt_handle_member_record 函数 用 内存中的坐席信息(参数q)与数据库中的坐席信息member_config,更新。

这里 ,interface作为坐席全局唯一标识。rt_handle_member_record 函数只更新坐席的penalty/paused state, 参数 ,如果内存中没有则创建坐席create_queue_member,创建后获取坐席的状态,调用get_queue_member_status。。。。

此函数 调用 ast_devce_status,-----》_ast_device_state-》sip_devicestate -》find_peer()->realtime_peer->build_peer--》ao2_t_alloc 创建peer

build_peer 为数据库表sipeers内关于sip peer参数的解析,如 rtp_engine,disallow,registertrying,rtptimeout,

rtpholdtimeout,fullcontact,dns解析等等一系列关于此sip peer的参数。此函数填充完之后返回创建的peer实体。

最后 返回realtime_peer

ast_debug(3, "-REALTIME- loading peer from database to memory. Name: %s. Peer objects: %d/n", peer->name, rpeerobjs);

/*! /brief Build peer from configuration (file or realtime static/dynamic) */
static struct sip_peer *build_peer(const char *name, struct ast_variable *v, struct ast_variable *alt, int realtime, int devstate_only)

然后删除那些在数据库中已经删除的坐席。。。

/*! /brief Part of PBX channel interface
/note
/parReturn values:---

If we have qualify on and the device is not reachable, regardless of registration
state we return AST_DEVICE_UNAVAILABLE

For peers with call limit:
- not registeredAST_DEVICE_UNAVAILABLE
- registered, no callAST_DEVICE_NOT_INUSE
- registered, active callsAST_DEVICE_INUSE
- registered, call limit reachedAST_DEVICE_BUSY
- registered, onholdAST_DEVICE_ONHOLD
- registered, ringingAST_DEVICE_RINGING

For peers without call limit:
- not registeredAST_DEVICE_UNAVAILABLE
- registeredAST_DEVICE_NOT_INUSE
- fixed IP (!dynamic)AST_DEVICE_NOT_INUSE

Peers that does not have a known call and can't be reached by OPTIONS
- unreachableAST_DEVICE_UNAVAILABLE

If we return AST_DEVICE_UNKNOWN, the device state engine will try to find
out a state by walking the channel list.

The queue system (/ref app_queue.c) treats a member as "active"
if devicestate is != AST_DEVICE_UNAVAILBALE && != AST_DEVICE_INVALID

When placing a call to the queue member, queue system sets a member to busy if
!= AST_DEVICE_NOT_INUSE and != AST_DEVICE_UNKNOWN

*/
static int sip_devicestate(void *data)

函数sip_devicestate 是坐席第一次创建时获取状态的入口,首先调用find_peer在内存中查找坐席(根据peer name即坐席名字 或者根据IP地址。。),第一次肯定找不到,

[Dec 22 09:59:46] DEBUG[6453]: devicestate.c:340 _ast_device_state: No provider found, checking channel drivers for SIP - 10000322000
[Dec 22 09:59:46] DEBUG[6453]: chan_sip.c:24955 sip_devicestate: Checking device state for peer 10000322000
[Dec 22 09:59:46] DEBUG[6453]: pbx.c:3662 ast_str_substitute_variables_full: Evaluating 'CURL(

所以调用realtime_peer 查找数据库。。。。if (!p && (realtime || devstate_only)) {
ast_log(LOG_DEBUG,"not find peer ,so build realtime peer realtime_peer/n");
p = realtime_peer(peer, addr, devstate_only, which_objects);

这里是 realtime_peer 函数是realtime peer从数据库注册到系统的关键.

/*! /brief realtime_peer: Get peer from realtime storage
* Checks the "sippeers" realtime family from extconfig.conf
* Checks the "sipregs" realtime family from extconfig.conf if it's configured.
* This returns a pointer to a peer and because we use build_peer, we can rest
* assured that the refcount is bumped.
*/
static struct sip_peer *realtime_peer(const char *newpeername, struct ast_sockaddr *addr, int devstate_only)

这里,此函数先根据 坐席名字且host字段为dynamic的坐席加载sippeers 数据表,

var = ast_load_realtime("sippeers", "name", newpeername, "host", "dynamic", SENTINEL);
if (!var && addr) {
var = ast_load_realtime("sippeers", "name", newpeername, "host", ast_sockaddr_stringify_addr(addr), SENTINEL);

如果根据坐席名字且host字段为dynamic找不到坐席,且坐席有地址,则根据坐席地址加载数据库表sippeers,如果仍然在数据库中找不到,则只根据坐席名字查找坐席。

if (!var) {
var = ast_load_realtime("sippeers", "name", newpeername, SENTINEL);
/*!/note
* If this one loaded something, then we need to ensure that the host
* field matched. The only reason why we can't have this as a criteria
* is because we only have the IP address and the host field might be
* set as a name (and the reverse PTR might not match).
*/

理顺一下:

1:根据坐席名字及host=dyanmic 加载表sipeers,

2:找不到,&&有地址,则根据 坐席名字和地址加载。

3:仍然找不到:根据坐席名字加载 sippeers

对于realtime应用 采用坐席名字作为索引时,应该 在第一步即在数据库中找到。

接下来 把数据库中的sip 坐席注册到 asterisk , 调用 build_peer

/* Peer found in realtime, now build it in memory */
peer = build_peer(newpeername, var, varregs, TRUE, devstate_only);

然后 判断 是否 有 开启 RT catch realtime peer,和auto clear peer选项。这两个选项会影响坐席在系统的存活时间。。

最后把创建好的坐席连接到peers链表。。。,如果坐席地址非空,则同时加载到 peers_by_ip列表。

至此,由于要获取坐席状态而引发的一系列动作终于返回到sip_devstate...,也就返回到 开始的join_queue函数,调用完load_realtime_queue之后,首先判断队列内坐席状态。。。如果没有可用坐席,则直接返回,如果开启了joinempty,则主叫会溢出队列,

同时设置溢出原因为QUEUE_JOINEMPTY,如果队列等待数大于等于队列设置的最大等待数,则设置为队列满,。

如果没有溢出,则把主叫放在合适的位置。。。这里,queue_ent为主叫链表,用当前主叫的优先级和当前链表内主叫比较。插入合适位置。

最后 发射AMI 事件 join

返回queue_exec,主叫已经进入队列,听音乐吧。。。

if (ringing) {
ast_indicate(chan, AST_CONTROL_RINGING);
} else {
ast_moh_start(chan, qe.moh, NULL);
}

调用wait_our_turn 排队状态,只有坐席可用时才返回,否则一直在队列中听音乐或队列播放给主叫一些额外提示信息(如提示客户耐心等待,您在当前队列中的位置等信息)。。。一旦坐席可用,主叫直接退出。。。

wait_our_turn 是一个死循环,内部先调用is_our_turn函数判断是否可以呼叫坐席,是则马上退出循环,否则,返回,查看是否自己在队列的超时时间已到,同时检查这个过程当中队列内的坐席是否已经没了,设置队列状态为leavewhenempty,

同时,等待过程中如果开启播报主叫位置信息则播报之,还有周期性播报其他信息(友情提示),然后停顿一秒,避免对CPU压力,

这一秒内接收按键,因为队列内主叫可以按键离开队列。。。这样,每个主叫在wait_our_turn一直下去,直到有可用坐席位置。

下面wait_our_turn返回,说明轮到我啦。。。。这时仍然检查超时时间到没到,语音播报还继续。。因为还没开始呼叫坐席。。

调用try_calling 呼叫队列内所有闲置坐席。。。

/*! /brief A large function which calls members, updates statistics, and bridges the caller and a member
*
* Here is the process of this function
* 1. Process any options passed to the Queue() application. Options here mean the third argument to Queue()
* 2. Iterate trough the members of the queue, creating a callattempt corresponding to each member. During this
* iteration, we also check the dialed_interfaces datastore to see if we have already attempted calling this
* member. If we have, we do not create a callattempt. This is in place to prevent call forwarding loops. Also
* during each iteration, we call calc_metric to determine which members should be rung when.
* 3. Call ring_one to place a call to the appropriate member(s)
* 4. Call wait_for_answer to wait for an answer. If no one answers, return.
* 5. Take care of any holdtime announcements, member delays, or other options which occur after a call has been answered.
* 6. Start the monitor or mixmonitor if the option is set
* 7. Remove the caller from the queue to allow other callers to advance
* 8. Bridge the call.
* 9. Do any post processing after the call has disconnected.
*
* /param[in] qe the queue_ent structure which corresponds to the caller attempting to reach members
* /param[in] options the options passed as the third parameter to the Queue() application
* /param[in] announceoverride filename to play to user when waiting
* /param[in] url the url passed as the fourth parameter to the Queue() application
* /param[in,out] tries the number of times we have tried calling queue members
* /param[out] noption set if the call to Queue() has the 'n' option set.
* /param[in] agi the agi passed as the fifth parameter to the Queue() application
* /param[in] macro the macro passed as the sixth parameter to the Queue() application
* /param[in] gosub the gosub passed as the seventh parameter to the Queue() application
* /param[in] ringing 1 if the 'r' option is set, otherwise 0
*/
static int try_calling(struct queue_ent *qe, const char *options, char *announceoverride, const char *url, int *tries, int *noption, const char *agi, const char *macro, const char *gosub, int ringing)

。。。。。。。。。。。。。。。。

解析参数,遍历坐席列表,把可以呼叫的坐席放到一个链表中,然后调用ring_one尝试呼叫坐席。这里还做了一些参数解析,比如 Rt,设置到 bridge_channel结构中,

当坐席与主叫成功bridge后执行。。。ring_entry 做最后检查后 调用ast_request呼叫坐席。

ring_one内部调用find_best 在之前的链表中找到最合适的一个坐席,调用Ring_entery 呼叫之,,,

调用ring_entry时,说明呼叫哪个坐席已经确定,但是在呼叫之前先做一下判断坐席状态,

* - Agent on call
* - Agent is paused
* - Wrapup time not expired
* - Priority by another queue

如果为上述几个状态则说明呼叫失败,设置cdr(主叫channel),如果不是以上几种状态则调用ast_request 请求创建 外乎channle.

如果创建失败,则返回ring_one,找下一个坐席,继续呼叫。。。

如果ast_request 请求成功。则往下走,设置坐席channle的相关参数,如 坐席 callerId等,,

/* If the new channel has no callerid, try to guess what it should be */
if (!tmp->chan->caller.id.number.valid) {
if (qe->chan->connected.id.number.valid) {
struct ast_party_caller caller;

ast_party_caller_set_init(&caller, &tmp->chan->caller);
caller.id = qe->chan->connected.id;
caller.ani = qe->chan->connected.ani;
ast_channel_set_caller_event(tmp->chan, &caller, NULL);
} else if (!ast_strlen_zero(qe->chan->dialed.number.str)) {
ast_set_callerid(tmp->chan, qe->chan->dialed.number.str, NULL, NULL);
} else if (!ast_strlen_zero(S_OR(qe->chan->macroexten, qe->chan->exten))) {
ast_set_callerid(tmp->chan, S_OR(qe->chan->macroexten, qe->chan->exten), NULL, NULL);
}
tmp->dial_callerid_absent = 1;
}

/* Inherit specially named variables from parent channel */
ast_channel_inherit_variables(qe->chan, tmp->chan);
ast_channel_datastore_inherit(qe->chan, tmp->chan);

坐席channle继承主叫channle的变量。。。

if (ast_cdr_isset_unanswered()) {
/* they want to see the unanswered dial attempts! */
/* set up the CDR fields on all the CDRs to give sensical information */
ast_cdr_setdestchan(tmp->chan->cdr, tmp->chan->name);
strcpy(tmp->chan->cdr->clid, qe->chan->cdr->clid);
strcpy(tmp->chan->cdr->channel, qe->chan->cdr->channel);
strcpy(tmp->chan->cdr->src, qe->chan->cdr->src);
strcpy(tmp->chan->cdr->dst, qe->chan->exten);
strcpy(tmp->chan->cdr->dcontext, qe->chan->context);
strcpy(tmp->chan->cdr->lastapp, qe->chan->cdr->lastapp);
strcpy(tmp->chan->cdr->lastdata, qe->chan->cdr->lastdata);
tmp->chan->cdr->amaflags = qe->chan->cdr->amaflags;
strcpy(tmp->chan->cdr->accountcode, qe->chan->cdr->accountcode);
strcpy(tmp->chan->cdr->userfield, qe->chan->cdr->userfield);
}

如果 cdr开启记录坐席未接,则此处新创建一个坐席侧CDR,以备坐席未应答时记录一条CDR.这里的cdr记录基本上是从主叫channle继承过来的。。。

最后调用 ast_call 呼叫坐席。。。失败则返回,继续呼叫下一个坐席。。。

/* Place the call, but don't wait on the answer */
if ((res = ast_call(tmp->chan, location, 0)))

失败则调用do_hang()挂掉 坐席侧,do_hang调用Ast_hanup,记录CDR.更新坐席状态。。。

如过开启发射AMI事件,则发射AgentCalled事件。ast_call不是一个阻塞函数,即其调用立即返回,等待坐席接听交给

wait_for_answer 处理。。。

下面返回看看wait_for_answer,(ring_one之后),

首先把正在呼叫的坐席的channle加入异步IO列表,用poll或epoll监听,等待过程中接收主叫及坐席的案件,并处理。。

wait_for_answer函数返回后说明等待坐席结束或者坐席接听,此时记录下一个需要呼叫的坐席,如果坐席未接听,记录CDR,

坐席接听了,则计算 等待时间等一些列参数,用于播报等待时间,如果在播报时间内坐席或主叫挂断则记录CDR,queue_log,调用Ast_hangup...

如果接听。。。

则停止播放等待音乐,调用ast_cdr_setdestchan 更新主叫channle的cdr deschan 字段为坐席channle.

然后 调用 ast_channel_make_compatible 桥接两者通话,实际上是处理媒体流兼容问题,Make sure channels are compatible

if (res < 0) {
ast_queue_log(queuename, qe->chan->uniqueid, member->membername, "SYSCOMPAT", "%s", "");
ast_log(LOG_WARNING, "Had to drop call because I couldn't make %s compatible with %s/n", qe->chan->name, peer->name);
record_abandoned(qe);
ast_cdr_failed(qe->chan->cdr);
ast_hangup(peer);
ao2_ref(member, -1);
return -1;
}

桥接失败则记录CDr 挂断,记录queue_log,为SYSCOMPAT,

如果成功则设置相关 参数,

至此 呼叫成功建立起来,

开启录音。。。处理 queue 参数,

记录queu_log CONNECT事件,发射 AgentConnect AMI事件,

然后把双方channle 交给features.c 的Ast_bridge_call ,以便双方通话过程中做其他特性动作,如转移,则交给feachuers.c内部函数处理。

最后,双方挂断,则记录是谁先挂的,并触发记录不同的queue_log事件。TRANSFER,COMPLETECALLER,COMPLETEAGENT。

最终 离开队列结束呼叫。。。

路线: queue->joinqueue->buildpeer->realtime_peer->wait_our_turn->is_our_turn->try_calling->ring_one->ring_enrty->sip_request_call->sip_alloc,->do_nat_->obproxy_get->ast_ouraddrfor->ast_sip_ouraddrfor->ast_codec_choose,,,sip_new-》sip_call。。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics