现在的位置: 首页 > 综合 > 正文

【原创】StreamInsight查询系列(十九)——查询模式之检测异常

2012年04月24日 ⁄ 综合 ⁄ 共 2805字 ⁄ 字号 评论关闭

上篇文章介绍了查询模式中如何发现趋势,这篇博文将介绍StreamInsight中如何检测异常。

测试数据准备

为了方便测试查询,我们首先准备一个静态的测试数据源:

var now = DateTime.Parse("09/12/2011 8:57:00 PM");
var input = new[]
{
    new { Time = now + TimeSpan.FromSeconds(1), Value = 20},
    new { Time = now + TimeSpan.FromSeconds(2), Value = 30},
    new { Time = now + TimeSpan.FromSeconds(3), Value = 120},
    new { Time = now + TimeSpan.FromSeconds(4), Value = 200},
    new { Time = now + TimeSpan.FromSeconds(5), Value = 20},
    new { Time = now + TimeSpan.FromSeconds(6), Value = 110},
    new { Time = now + TimeSpan.FromSeconds(7), Value = 110},
    new { Time = now + TimeSpan.FromSeconds(8), Value = 210},
    new { Time = now + TimeSpan.FromSeconds(9), Value = 120},
    new { Time = now + TimeSpan.FromSeconds(10), Value = 130},
    new { Time = now + TimeSpan.FromSeconds(11), Value = 20},
    new { Time = now + TimeSpan.FromSeconds(12), Value = 30},
};

接下去将上述数据源转变为点类型复杂事件流:

var inputStream = input.ToPointStream(Application, t =>
        PointEvent.CreateInsert(t.Time.ToLocalTime(), new { Value = t.Value }),
        AdvanceTimeSettings.IncreasingStartTime);

异常检测

问题:怎样每秒1次的计算过去5秒内Value字段值超过阈值100的事件数超过事件总数目80%的异常事件?

首先我们定义一下结果事件流中的负载类型SpikeEvent如下:

struct SpikeEvent
{
    public double Ratio { get; set; }
}

我们最终希望调用查询的方式如下:

int threshold = 100;
double ratio = 0.8;

var resultStream = DetectSpikes(
    inputStream,
    threshold, // 指定阈值(超过该阈值的事件被认为是“特殊事件”)
    ratio, // “特殊事件”占事件总数的百分比
    TimeSpan.FromSeconds(5), // 窗口大小
    TimeSpan.FromSeconds(1), // 跳跃大小
    e => e.Value); // 指定的比较字段

因此最关键的部分就是如何实现DetectSpikes。阅读过

StreamInsight查询系列(十五)——查询模式之窗口比率》文章的读者应该对此类查询并不陌生。

这里不加过多描述地给出DetectSpikes的实现:

/// <summary>
/// 在输入流中检测异常
/// </summary>
/// <typeparam name="TInput">输入流事件类型</typeparam>
/// <param name="inputStream">输入流</param>
/// <param name="threshold">异常定义阈值</param>
/// <param name="ratio">异常事件占事件总数的百分比</param>
/// <param name="windowSize">衡量事件数目的窗口大小</param>
/// <param name="hopSize">跳跃大小</param>
/// <param name="fieldSelector">选择输入事件中的某个字段来检测事件类型</param>
/// <returns>query that detects the spikes</returns>
private static CepStream<SpikeEvent> DetectSpikes<TInput>(
    CepStream<TInput> inputStream, int threshold, double ratio,
    TimeSpan windowSize, TimeSpan hopSize,
    Expression<Func<TInput, int>> fieldSelector)
{
    // 统计跳跃窗口内所有事件的数目
    var totalValues = from w in inputStream.HoppingWindow(
        windowSize,
        hopSize,
        HoppingWindowOutputPolicy.ClipToWindowEnd)
                      select new
                      {
                          Count = w.Count(),
                      };

    // 构造包含过滤条件的LINQ语句
    var parameter = fieldSelector.Parameters.First();
    var field = fieldSelector.Body;
    Expression<Func<TInput, bool>> filterExpression = (Expression<Func<TInput, bool>>)Expression.Lambda(
        Expression.GreaterThan(field, Expression.Constant(threshold)),
        parameter);

    // 统计跳跃窗口内异常事件的数目
    var bigValues = from w in inputStream.Where(filterExpression).HoppingWindow(
        windowSize,
        hopSize,
        HoppingWindowOutputPolicy.ClipToWindowEnd)
                    select new
                    {
                        Count = w.Count(),
                    };

    // 选择异常事件数目超过事件总数一定百分比的事件
    var output = from total in totalValues.ToPointEventStream()
                 join big in bigValues.ToPointEventStream()
                    on true equals true
                 where big.Count * 1.0 / total.Count >= ratio
                 select new SpikeEvent { Ratio = big.Count * 1.0 / (total.Count) };

    return output;
}

输出结果如下:

下一篇将介绍StreamInsight查询模式中如何检测间隙事件。

抱歉!评论已关闭.