现在的位置: 首页 > 综合 > 正文

线程同步对象的实现

2014年03月08日 ⁄ 综合 ⁄ 共 3687字 ⁄ 字号 评论关闭
struct futex {
	volatile int lock;
	volatile int count;
};

#define LARGE_ENOUGH_NEGATIVE			-0x7fffffff

#ifdef	__cplusplus
extern "C" {
#endif

static inline void
futex_init(struct futex* pf, int count)
{
	pf->lock = 0;
	pf->count = count;
}

/* Return value:
 *         0: okay
 * ETIMEDOUT: timeout
 *     EINTR: interrupted
 */
static inline int 
futex_sema_down(struct futex* pf, struct timespec* timeout, bool interruptable)
{
	int n = atomic_add(&pf->count, -1);
	if (n <= 0) {
retry:
        if (0 == sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout)) {
			return 0;
        }

		switch (errno) {
		case ETIMEDOUT: 
			atomic_add(&pf->count, 1); 
			return ETIMEDOUT;
		case EINTR: 
			if (!interruptable)
				goto retry;
			atomic_add(&pf->count, 1); 
			return EINTR;
		default:
			RaiseError(IMPOSSIBLE__Can_not_lock_in_futex_sema_down);
		}
	}
	return 0;
}

/* Return value:
 *  1: wake up some waiter
 *  0: none is waiting
 */
static inline int
futex_sema_up(struct futex* pf)
{
	int retry;
    int n = atomic_add(&pf->count, 1);
	if (n < 0) {
        retry = 10;
		while (1 != (n=sys_futex(&pf->lock, FUTEX_WAKE, 1, NULL))) {
			/* it means the downer decreases the count but not yet start waiting 
			 *   --- may be interrupted near the retry label in the above function;
			 * so we have to wait and retry.
			 */
            if (retry --) { 
                nop(); 
            } 
            else { 
                retry = 10; 
                thread_yield();
            }
		}
		return n;
	}
	return 0;
}

/* Return value:
 *		   0: okay
 * ETIMEDOUT: timeout
 *     EINTR: interrupted
 */
static inline int 
futex_cond_wait(struct futex* pf, struct timespec* timeout, bool interruptable)
{
/* I dont know whether it is a bug of linux kernel.
 * Sometimes, sys_futex(.., FUTEX_WAIT, ..) returns 0, but the condition is not satisfied.
 * So we have to check the condition again after return.
 */
    while (0 < AtomicGetValue(pf->count)) {
        sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout);
        switch (errno) {
		case ETIMEDOUT: 
			return ETIMEDOUT;
		case EINTR: 
            if (interruptable) {
			    return EINTR;
            }
        default:
            break;
		}
	}
	return 0;
//    int nnn;
//	int n = AtomicGetValue(pf->count);
//	if (0 != n) {
//retry:
//        //n = sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout);
//        if (0 == sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout)) {
//            nnn = errno;
//            int lock = AtomicGetValue(pf->lock);
//            int count = AtomicGetValue(pf->count);
//            UNLIKELY_IF (EAGAIN == errno) {
//                goto retry;
//            }
//            ASSERT_EQUAL(nnn, 0);
//            ASSERT_EQUAL(count, 0);
//			return 0;
//        }
//        
//		
//		switch (errno) {
//        case 0:
//            //ASSERT_EQUAL(pf->count, 0);
//            return 0;
//		case ETIMEDOUT: 
//			return ETIMEDOUT;
//		case EINTR: 
//			if (!interruptable)
//				goto retry;
//			return EINTR;
//		case EWOULDBLOCK:
//			// already signaled
//            nnn = AtomicGetValue(pf->lock);
//            ASSERT_EQUAL(nnn, 1);
//			return 0;
//		default:
//			RaiseError(IMPOSSIBLE__Can_not_lock_in_futex_cond_wait);
//		}
//	}
//	return 0;
}

/* Return value:
 *   the number of woken waiters
 */
static inline int
futex_cond_signal(struct futex* pf)
{
	int n = atomic_add(&pf->count, -1);
	if (1 == n) {
        pf->lock = 1;
        mfence_c();
		return sys_futex(&pf->lock, FUTEX_WAKE, 65535, NULL);	// I hope 65535 is enough to wake up all
	}
	return 0;
}

static inline int
futex_cond_revoke(struct futex* /*pf*/)
{
	// TODO:
	return 0;
}

/* Return value:
 *         0: okay
 * ETIMEDOUT: timeout
 *     EINTR: interrupted
 */
static inline int
futex_event_wait(struct futex* pf, struct timespec* timeout, bool interruptable)
{
    int n = atomic_add(&pf->count, 1);
    if (0 <= n) {    
retry:
        if (0 == sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout))
            return 0;

        switch (errno) {
        case ETIMEDOUT: 
            atomic_add(&pf->count, -1); 
            return ETIMEDOUT;
        case EINTR: 
            if (!interruptable)
                goto retry;
            atomic_add(&pf->count, -1); 
            return EINTR;
        default:
            RaiseError(IMPOSSIBLE__Can_not_lock_in_futex_sema_down);
        }
    }
    else {  // else signaled
        AtomicSetValue(pf->count, LARGE_ENOUGH_NEGATIVE);
    }
    return 0;
}


/* Return value:
 *  the number of waiters if any
 */
static inline int
futex_event_signal(struct futex* pf, bool reset)
{
    int m, n, retry;

    n = AtomicSetValue(pf->count, reset ? 0 : LARGE_ENOUGH_NEGATIVE);
    if (0 < n) {
        retry = 10;
        m = n;
        do {
            n -= sys_futex(&pf->lock, FUTEX_WAKE, n, NULL);
            if (0 == n)
                return m;
            if (retry --) { 
                nop(); 
            } 
            else { 
                retry = 10; 
                thread_yield();
            }
        } while (1);
    }
    return 0;
}

static inline void
futex_event_reset(struct futex* pf) 
{
    int n, retry = 10;
    do {
        n = AtomicSetValueIf(pf->count, 0, LARGE_ENOUGH_NEGATIVE);
        if (0<=n || LARGE_ENOUGH_NEGATIVE==n) {
            return;
        }
        if (retry --) { 
            nop(); 
        } 
        else { 
            retry = 10; 
            thread_yield();
        }
    } while (1);
}

#ifdef	__cplusplus
}
#endif

【上篇】
【下篇】

抱歉!评论已关闭.