wso2cep - When stream event Join in WSO2 CEP each output event produsing two times -
i using wso2 cep embedded inside wso2 das following execution plan
@import('relatedstream:1.0.0') define stream rs (id string, product string, uc1 string, state string, brand string, model string, type string, tweet string, rtid string); @import('inputstream:1.0.0') define stream ins (id string, product string, uc1 string, state string, brand string, model string, type string, tweet string); @export('matchingstream:1.0.0') define stream ms (rid string, rproduct string, ruc1 string, rstate string, rbrand string, rmodel string, rtype string, hid string, hproduct string, huc1 string, hstate string, hbrand string, hmodel string, htype string); ins#window.time(2 sec) r join rs#window.length(1) h on r.product == h.product , r.brand==h.brand , r.type==h.type , r.model==h.model , r.state!=h.state , r.id==h.rtid select r.id rid, r.product rproduct , r.uc1 ruc1 , r.state rstate, r.brand rbrand , r.model rmodel , r.type rtype ,h.id hid , h.product hproduct , h.uc1 huc1 , h.state hstate , h.brand hbrand, h.model hmodel , h.type htype insert events ms;
every output event producing 2 times same values
the event message tracer log below
stream 1 (inputstream)
08:23:21,845 [-] [databridge-core-pool-1-thread-3] info tenantid : -1234, event processor : r_h_match, event stream : inputstream:1.0.0 (ins), before processing _event{timestamp=1464144801440, data=[27, phone, g1, sell, samsung, galaxy note, type, use um10 10% off #unlockyourphone galaxy note 6 reportedly first samsung phone feature us... sell], isexpired=false} (sanitized)
stream two(relatedstream)
08:23:21,974 [-] [databridge-core-pool-1-thread-2] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801943, data=[5, phone, g1, sell, samsung, iphone 4s, type, white 3 usb port car charger adapter iphone 4s 5s 5c 6 6s ipad samsung phone - bid now? sell, 27], isexpired=false} (sanitized) 08:23:21,998 [-] [databridge-core-pool-1-thread-4] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801944, data=[11, phone, g1, buy, samsung, galaxy s4, type, #cellular #deals samsung galaxy s4 sch-i545 16gb verizon at&t gsm unlocked cell phone rf buy, 27], isexpired=false} (sanitized) 08:23:22,030 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801944, data=[13, phone, g1, sell, samsung, galaxy note, type, unlocked t-mobile samsung galaxy note 3 sm-n900t 4g lte gsm 32gb smart phone sell, 27], isexpired=false} (sanitized) 08:23:22,031 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801944, data=[18, phone, g1, sell, samsung, galaxy s6, type, cell phones : new samsung galaxy s6 edge sm-g925f 5.1'' 16mp (factory unlocked) 32gb phone sell, 27], isexpired=false} (sanitized) 08:23:22,031 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801944, data=[19, phone, g1, sell, samsung, galaxy s4, type, #cellular #deals samsung i545 galaxy s4 16gb verizon 13mp camera wifi cell phone sell, 27], isexpired=false} (sanitized) 08:23:22,031 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801944, data=[1, phone, g1, sell, samsung, galaxy s6, type, cell phone usa : new samsung galaxy s6 edge sm-g925f 5.1'' 16mp (factory unlocked) 32gb ph? sell, 27], isexpired=false} (sanitized) 08:23:22,033 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801944, data=[14, phone, g1, sell, samsung, galaxy note, type, galaxy note 6 reportedly first samsung phone feature usb-c - sell, 27], isexpired=false} (sanitized) 08:23:22,034 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801945, data=[16, phone, g1, buy, samsung, galaxy note, type, #unlocksquare galaxy note 6 reportedly first samsung phone feature usb-c buy, 27], isexpired=false} (sanitized) 08:23:22,035 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801945, data=[17, phone, g1, buy, samsung, galaxy s5, type, cell phone usa : new t-mobile protective cover/holster samsung galaxy s5 case kickstan? buy, 27], isexpired=false} (sanitized) 08:23:22,035 [-] [databridge-core-pool-1-thread-5] info tenantid : -1234, event processor : r_h_match, event stream : relatedstream:1.0.0 (rs), before processing _event{timestamp=1464144801945, data=[15, phone, g1, buy, samsung, galaxy s6, type, cell phone usa : samsung galaxy s6 sm-g920f 32gb unlocked 16mp smartphone #4422 buy, 27], isexpired=false} (sanitized)
output stream (matchingstream)
08:23:22,041 [-] [siddhi-r_h_match-executor-thread-1] info tenantid : -1234, event processor : r_h_match, event stream : matchingstream:1.0.0 (ms), after processing _[event{timestamp=1464144801945, data=[27, phone, g1, sell, samsung, galaxy note, type, 16, phone, g1, buy, samsung, galaxy note, type], isexpired=false}] (sanitized) 08:23:22,120 [-] [siddhi-r_h_match-executor-thread-1] info tenantid : -1234, event processor : r_h_match, event stream : matchingstream:1.0.0 (ms), after processing _[event{timestamp=1464144802037, data=[27, phone, g1, sell, samsung, galaxy note, type, 16, phone, g1, buy, samsung, galaxy note, type], isexpired=false}] (sanitized)
when insert events output event stream, includes both current events (incoming events) , expired events (emitted windows after timing out or when window length exceeded). hence there possibility of getting duplicates.
if requirement trigger output using incoming events of 'rs' stream, can try following: (with current events)
from ins#window.time(2 sec) r join rs#window.length(0) h on r.product == h.product , r.brand==h.brand , r.type==h.type , r.model==h.model , r.state!=h.state , r.id==h.rtid select r.id rid, r.product rproduct , r.uc1 ruc1 , r.state rstate, r.brand rbrand , r.model rmodel , r.type rtype ,h.id hid , h.product hproduct , h.uc1 huc1 , h.state hstate , h.brand hbrand, h.model hmodel , h.type htype insert current events ms;
Comments
Post a Comment