在开源的数据库中,支持并行建索引的,只有ingres,但它并不是并行建一个索引,而是对同一个表,同时并行建多个索引,
下面主要分析它如何协调主子线程通信部分,使用版本是 ingres-10.1.0-114-gpl;
并行建索引函数为:src/back/dmf/dmu/dm2upind.c STATUS dm2u_pindex( DM2U_INDEX_CB *index_cbs)
主子线程通信函数是 src/back/dmf/dmu/dm2uputi.c
DB_STATUS dm2u_pload_table(DM2U_MXCB *mxcbs, i4 *rec_cnt, DB_ERROR *dberr)
ingres是多线程,所以建索引的主线程和各个子线程是通过内存直接交换信息,没有通过共享内存;它由多个页面组成
/*}
** Name: exch_buf
**
** Description:
** This structure is used to describe the individual exchange buffers.
** This contains its size, the number in its list and a count field
** which all readers increment when that buffer is being read and
** decremented when done.
**
** History:
** 10-apr-1998 (nanpr01)
** Created for Parallel Index Build.
** 05-may-2000 (stial01)
** Added rec_cnt to prototype for dm2u_pload_table
** 每个交换页面的头结构
*/
struct _EXCH_BUF {
CS_SEMAPHORE exch_buf_mutex; /* Mutex to protect sim access */
i4 exch_bufno; /* buffer number for the exch buf */
char *exch_buffer; /* actual exchange buffer 指向实际的页面 */
i4 exch_noofrec; /* no of record holding */
CS_SEMAPHORE exch_cnt_mutex; /* mutex to protect count */
i4 exch_usr_cnt; /* no of readers to read from
** this buffer
*/
char exch_visit[512/BITSPERBYTE];/* Who visited this buffer
** b115572: increase max concurrent
** indexes to 512
*/
};
//exch_buffer的大小,不是常见的固定页面大小8K,而是
//(sizeof(DM_TID) + 每个元组大小) * no_of_records;
/*}
** Name: child_cb
**
** Description:
** This structure is used to describe the control block passed to the
** child. In this control block, mxcb address and the exchange buffer
** header is passed
**
** History:
** 10-apr-1998 (nanpr01)
** Created for Parallel Index Build.
** 每个子线程从主线程得到的数据结构
*/
struct _CHILD_CB {
i4 ccb_noofchild; /* No of fac. thread created */
EXCH_BUF_HDR *ccb_exch_hdr; /* Exchange Buffer header */
DM2U_MXCB *ccb_mxcb; /* mxcb for each create ind */
i4 ccb_childno; /* This contains the children
** number
*/
i4 ccb_thread_id; /* child thread id */
DB_STATUS ccb_status; /* child status */
DB_ERROR ccb_error; /* child error array */
};
DB_STATUS dm2u_pload_table(DM2U_MXCB *mxcbs, i4 *rec_cnt, DB_ERROR *dberr)
/* Create the factotum threads for parallel index creation */
//是直接创建线程,创建后把新建的线程链入一个链表,链表还没仔细看
//ftc.ftc_thread_entry = build_pindex;
//ftc.ftc_thread_exit = build_pindex_exit;
//上面两个就是给子线程的执行函数
status = create_index_threads(mxcbs, ccb, exch_hdr,
(EXCH_BUF *)exch_buf, no_of_ind, dberr);
//主线程向交换页面不停的放入数据,放满一个页面后,子线程会取走,不满的话,子线程看不到,除非是最后一个页面
while ( status == E_DB_OK && !end_of_file &&
!(m->mx_flags & MX_ONLINE_INDEX_BUILD) )
{
/* get a record from the base table */
local_status = dm2r_get(m->mx_rcb, &tid, DM2R_GETNEXT,
dummy_record, dberr);
/* check to see if user interrupt occurred. */
if ( local_status == E_DB_OK && *(m->mx_rcb->rcb_uiptr) )
{
/* If XCB, check via SCB */
if ( m->mx_rcb->rcb_xcb_ptr )
{
dmxCheckForInterrupt(m->mx_rcb->rcb_xcb_ptr, &error);
if ( error )
SETDBERR(dberr, 0, error);
}
else if (*(m->mx_rcb->rcb_uiptr) & RCB_USER_INTR)
SETDBERR(dberr, 0, E_DM0065_USER_INTR);
/* check to see if force abort occurred. */
else if (*(m->mx_rcb->rcb_uiptr) & RCB_FORCE_ABORT)
SETDBERR(dberr, 0, E_DM010C_TRAN_ABORTED);
if ( dberr->err_code )
local_status = E_DB_ERROR;
}
if (local_status == E_DB_OK)
*rec_cnt = *rec_cnt + 1;
/* put record in exchange buffer */
status = put_record(exch_hdr, &cur_buf, &tid, dummy_record,
reclen, no_of_ind, &end_of_file,
local_status, dberr);
}
/*
** We have finished loading the data to the child threads now
** and child threads can go concurrent
** We have to wait for all the children to finish.
** If one of them finishes with error, we have to tell others
** to stop at the earliest opportunities and return error
** when everyone is done.
*/
wait_for_index_finish(exch_hdr);
ccb = (CHILD_CB *)ftx->ftx_data;
exch_hdr = ccb->ccb_exch_hdr;
m = ccb->ccb_mxcb;
tp = m->mx_tpcb_next;
/* round robin buffer for reading and that is why always start at 1 */
cur_buf = exch_hdr->ehdr_exch_buffers;
reclen = (exch_hdr->ehdr_size/
(exch_hdr->ehdr_rec_capacity) - sizeof(DM_TID));
if ( (ccb->ccb_status = init_child_thread(tp, &wloc_mask, &ccb->ccb_error)) )
return(ccb->ccb_status);
if (m->mx_flags & MX_ONLINE_INDEX_BUILD)
return(ccb->ccb_status);
/*
** This routine waits for parents to send data in exchange buffer
** until nomore data
*/
//从主线程接收数据的主循环
while ( ccb->ccb_status == E_DB_OK && !allread )
{
#ifdef xDEBUG
TRdisplay("child %d Going to wait on buffer %d/n",
ccb->ccb_childno, cur_buf->exch_bufno);
#endif
/* Wait to get a buffer using the shared mutex */
CSp_semaphore(FALSE, &cur_buf->exch_buf_mutex);
/* check for errors before reading data */
if (exch_hdr->ehdr_error_flag)
{
ccb->ccb_status = E_DB_WARN;
/* if parent is waiting on this buffer .. wake him up */
if (exch_hdr->ehdr_parent_status == cur_buf->exch_bufno)
{
exch_hdr->ehdr_parent_status = 0;
CSresume(ftx->ftx_thread_id);
}
CSv_semaphore(&cur_buf->exch_buf_mutex);
break;
}
#ifdef xDEBUG
TRdisplay("child %d Got buffer mutex for buffer %d/n",
ccb->ccb_childno, cur_buf->exch_bufno);
#endif
/* Mutex the buffer */
CSp_semaphore(TRUE, &cur_buf->exch_cnt_mutex);
//当一个页面没写满时,可通过下面的判断,跳过该页面
if (!cur_buf->exch_usr_cnt)
{
/* Child switched to this buffer before the parent filled it */
#ifdef xDEBUG
TRdisplay(" CHILD %d buffer %x %d has no users no_of_records %d/n",
ccb->ccb_childno, cur_buf, cur_buf->exch_bufno,
cur_buf->exch_noofrec );
#endif
/* if parent is waiting on this buffer .. wake him up */
if (exch_hdr->ehdr_parent_status == cur_buf->exch_bufno)
{
exch_hdr->ehdr_parent_status = 0;
CSresume(ftx->ftx_thread_id);
}
CSv_semaphore(&cur_buf->exch_cnt_mutex);
CSv_semaphore(&cur_buf->exch_buf_mutex);
/* Let the parent have a chance to run */
CSswitch();
continue;
}
/* Have we visited this buffer already */
if (BTtest(ccb->ccb_childno, (char *)&cur_buf->exch_visit))
{
#ifdef xDEBUG
TRdisplay("child %d already visited this buffer %d/n",
ccb->ccb_childno, cur_buf->exch_bufno);
#endif
/* Wait for other children to catch up */
CSv_semaphore(&cur_buf->exch_cnt_mutex);
CSv_semaphore(&cur_buf->exch_buf_mutex);
/* Let the other children have a chance to run */
CSswitch();
continue;
}
CSv_semaphore(&cur_buf->exch_cnt_mutex);
//遇到非满页面,认为主线程已把所有数据发送完毕
if (cur_buf->exch_noofrec != exch_hdr->ehdr_rec_capacity)
{
/* This is the last buffer to read */
allread = TRUE;
}
#ifdef xDEBUG
TRdisplay("child %d reading buffer %d end %d/n",
ccb->ccb_childno, cur_buf->exch_bufno, allread);
#endif
if ( noofrecord = cur_buf->exch_noofrec )
{
i4 nth;
while ( noofrecord > 0 && ccb->ccb_status == E_DB_OK )
{
nth = cur_buf->exch_noofrec - noofrecord;
get_record(exch_hdr, cur_buf,
tp, nth, reclen, &tid);
ccb->ccb_status = load_sorter(tp, &tid, &ccb->ccb_error);
noofrecord--;
}
CSp_semaphore(TRUE, &cur_buf->exch_cnt_mutex);
cur_buf->exch_usr_cnt--;
BTset(ccb->ccb_childno, (char *)&cur_buf->exch_visit);
if (!cur_buf->exch_usr_cnt)
{
/* if parent is waiting on this buffer .. wake him up */
if (exch_hdr->ehdr_parent_status == cur_buf->exch_bufno)
{
exch_hdr->ehdr_parent_status = 0;
CSresume(ftx->ftx_thread_id);
}
}
CSv_semaphore(&cur_buf->exch_cnt_mutex);
}
CSv_semaphore(&cur_buf->exch_buf_mutex);
#ifdef xDEBUG
TRdisplay("child %d finished reading this buffer %d/n",
ccb->ccb_childno, cur_buf->exch_bufno);
#endif
if ( ccb->ccb_status == E_DB_OK )
{
if (allread)
{
ccb->ccb_status = dmse_input_end(tp->tpcb_srt, &ccb->ccb_error);
}
else if (cur_buf->exch_bufno == exch_hdr->ehdr_noofbufs)
/* reset back to 1 */
cur_buf = exch_hdr->ehdr_exch_buffers;
else
cur_buf++;
}
}
//子线程数据接收完毕,开始建索引
if ( ccb->ccb_status == E_DB_OK )
ccb->ccb_status = load_index(tp, wloc_mask, &ccb->ccb_error);
else
{
if ( dmse_end(tp->tpcb_srt, &local_dberr) )
{
uleFormat(&local_dberr, 0, (CL_ERR_DESC *)NULL, ULE_LOG, NULL,
(char *)NULL, (i4)0, (i4 *)NULL, &local_err_code, 0);
if (local_dberr.err_code > ccb->ccb_error.err_code)
ccb->ccb_error = local_dberr;
}
tp->tpcb_srt = 0;
}
if (wloc_mask != NULL)
dm0m_deallocate((DM_OBJECT **)&wloc_mask);
return(ccb->ccb_status);
}
在子线程从交换页面读数据时,是互斥的,就是说同一时刻只有一个子线程读一个交换页面,不存在并发读
而在主线程写的时候,是没加互斥的,
一个写,多个读的时候,读之间互斥,写没加互斥,没写满之前,读不到
主线程放数据,子线程取数据的函数,没什么东西,主要是写满之后,取下一个页面的函数
p = cur_buf->exch_buffer + (nth * (reclen + sizeof(DM_TID)));
MEcopy(p, sizeof(DM_TID), tid);
p += sizeof(DM_TID);
MEcopy(p, reclen, tp->tpcb_crecord);
#ifdef xDEBUG
/* Now got it - so read it */
TRdisplay("Child Record in buffer %d pos %d TID :",
cur_buf->exch_bufno, nth);
TRdisplay("%d/tRecord :", tid->tid_i4);
for (i = 0; i < reclen; i++)
{
TRdisplay("%1c", tp->tpcb_crecord[i]);
}
TRdisplay("/nAddresses 0x%x 0x%x/n", p-sizeof(DM_TID), p);
#endif
return(E_DB_OK);
}
/*{
** Name: put_record
**
** Description:
** This routine gets a record from the base table and puts it in the
** current exchange buffer. If the current exchange buffer is full,
** it gets the next exchage buffer by calling switch_buffer routine.
**
** Inputs:
** Outputs:
** Returns:
** E_DB_OK
** E_DB_ERROR
** Exceptions:
** none
**
** Side Effects:
** none
**
** History:
** 10-apr-98 (nanpr01)
** Created.
** 05-may-2000 (stial01, gupsh01)
** If E_DM0055_NONEXT, and current buffer is full, the parent
** switches to the next buffer. Set exch_usr_cnt and exch_visit
** in the next buffer.
*/
static STATUS
put_record(
EXCH_BUF_HDR *exch_buf_hdr,
EXCH_BUF **cur_buf_ptr,
DM_TID *tid,
char *record,
i4 reclen,
i4 noofchild,
i4 *end_of_file,
i4 local_status,
DB_ERROR *dberr)
{
char *start;
i4 i;
EXCH_BUF *cur_buf = *cur_buf_ptr;
STATUS status;
if (local_status == E_DB_OK)
{
if ((cur_buf->exch_noofrec + 1) > exch_buf_hdr->ehdr_rec_capacity)
{
status = switch_buffer(exch_buf_hdr, cur_buf_ptr, noofchild);
if (status != E_DB_OK)
return(E_DB_ERROR);
cur_buf = *cur_buf_ptr;
}
start = cur_buf->exch_buffer +
(cur_buf->exch_noofrec * (reclen + sizeof(DM_TID)));
MEcopy((char*)tid, sizeof(DM_TID), start);
MEcopy(record, reclen, start+sizeof(DM_TID));
#ifdef xDEBUG
TRdisplay("Parent Record in buffer %d pos %d TID: ",
cur_buf->exch_bufno,
cur_buf->exch_noofrec);
TRdisplay("%d/t Record:", tid->tid_i4);
for (i = 0; i < reclen; i++)
{
TRdisplay("%1c", record[i]);
}
TRdisplay("/n Addresses 0x%x 0x%x/n", start, start+sizeof(DM_TID));
#endif
(cur_buf->exch_noofrec)++;
return(E_DB_OK);
}
else {
//处理最后的未满页面
if (local_status == E_DB_ERROR && dberr->err_code == E_DM0055_NONEXT)
{
/* if the current buffer is full, let children read it */
if ((cur_buf->exch_noofrec + 1) > exch_buf_hdr->ehdr_rec_capacity)
{
status = switch_buffer(exch_buf_hdr, cur_buf_ptr, noofchild);
if (status != E_DB_OK)
return(E_DB_ERROR);
cur_buf = *cur_buf_ptr;
}
/*
** Always set exch_usr_cnt and exch_visit when parent gets EOF
** so that the child can detect if it switches to a buffer
** before the parent.
*/
cur_buf->exch_usr_cnt = noofchild;
MEfill(sizeof(cur_buf->exch_visit), '/0', &cur_buf->exch_visit);
CSv_semaphore(&cur_buf->exch_buf_mutex);
/*
** End of table - Tell children to start load phase
*/
CLRDBERR(dberr);
*end_of_file = 1;
return(E_DB_OK);
}
else {
/* error occurred while reading - so bail out */
/* Tell children that parent got error */
exch_buf_hdr->ehdr_error_flag = PARENT_ERROR;
CSv_semaphore(&cur_buf->exch_buf_mutex);
return(E_DB_ERROR);
}
}
}
/* done with this buffer */
//经过下面的两个操作,子线程才能看到当前页面
cur_buf->exch_usr_cnt = noofchild;
MEfill(sizeof(cur_buf->exch_visit), '/0', &cur_buf->exch_visit);
/* Is it last buffer */
if (cur_buf->exch_bufno == exch_buf_hdr->ehdr_noofbufs)
{
/* set the next buffer to 1 */
next_buf = exch_buf_hdr->ehdr_exch_buffers;
}
else
{
next_buf = cur_buf;
next_buf++;
}
CSv_semaphore(&cur_buf->exch_buf_mutex);
CSp_semaphore(TRUE, &next_buf->exch_buf_mutex);
//下面的保证,空闲页面必须是每个子线程读过的
while ( exch_buf_hdr->ehdr_error_flag ||
next_buf->exch_usr_cnt > 0 )
{
/*
** check the error code here for children & if they wake
** you up for error
*/
if (exch_buf_hdr->ehdr_error_flag)
{
#ifdef xDEBUG
TRdisplay("Parent saw the error and going to exit routines for children to finish/n");
#endif
/* we have now seen child raised error ... hence exit */
CSv_semaphore(&next_buf->exch_buf_mutex);
return(E_DB_ERROR);
}
/* Tell children we're waiting on this buffer */
exch_buf_hdr->ehdr_parent_status = next_buf->exch_bufno;
CSv_semaphore(&next_buf->exch_buf_mutex);
CSsuspend(0,0,0);
/* CSresume-er will clear ehdr_parent_status */
CSp_semaphore(TRUE, &next_buf->exch_buf_mutex);
/* Cancel any extra resumes from those pesky kids */
CScancelled((PTR)0);
}
//清空页面中存入的元组数
next_buf->exch_noofrec = 0;
MEfill(sizeof(next_buf->exch_visit), '/0', &next_buf->exch_visit);
*cur_buf_ptr = next_buf;
return(E_DB_OK);
}
实现了一个线程写,多个线程读,每个读线程读到全是一样的,