88import java .util .HashMap ;
99import java .util .List ;
1010import java .util .Map ;
11+ import java .util .TreeMap ;
1112
1213import org .jlab .jnp .hipo4 .data .Bank ;
1314import org .jlab .jnp .hipo4 .data .Event ;
@@ -35,10 +36,13 @@ public class Processor {
3536
3637 private final String outputPrefix = "tmp_" ;
3738
39+ private Bank runConfig = null ;
40+ private Bank recEvent = null ;
3841 private ConstantsManager conman = null ;
3942 private SchemaFactory schemaFactory = null ;
4043 private DaqScalersSequence chargeSequence = null ;
4144 private HelicitySequenceDelayed helicitySequence = null ;
45+ private TreeMap <Integer ,Integer > eventUnix = null ;
4246
4347 public Processor (File file , boolean restream , boolean rebuild ) {
4448 configure (Arrays .asList (file .getAbsolutePath ()), restream , rebuild );
@@ -58,19 +62,24 @@ public Processor(SchemaFactory schema, HelicitySequenceDelayed h, DaqScalersSequ
5862 schemaFactory = schema ;
5963 helicitySequence = h ;
6064 chargeSequence = s ;
65+ runConfig = new Bank (schemaFactory .getSchema ("RUN::config" ));
66+ recEvent = new Bank (schemaFactory .getSchema ("REC::Event" ));
6167 }
6268
6369 private void configure (List <String > preloadFiles , boolean restream , boolean rebuild ) {
6470 if (!preloadFiles .isEmpty ()) {
6571 HipoReader r = new HipoReader ();
6672 r .open (preloadFiles .get (0 ));
73+ schemaFactory = r .getSchemaFactory ();
74+ r .close ();
75+ runConfig = new Bank (schemaFactory .getSchema ("RUN::config" ));
76+ recEvent = new Bank (schemaFactory .getSchema ("REC::Event" ));
6777 conman = new ConstantsManager ();
6878 conman .init (CCDB_TABLES );
69- schemaFactory = r .getSchemaFactory ();
7079 helicitySequence = Util .getHelicity (preloadFiles , schemaFactory , restream , conman );
7180 if (rebuild ) chargeSequence = DaqScalersSequence .rebuildSequence (1 , conman , preloadFiles );
7281 else chargeSequence = DaqScalersSequence .readSequence (preloadFiles );
73- r . close ();
82+ eventUnix = getEventUnixMap ( preloadFiles );
7483 }
7584 }
7685
@@ -92,117 +101,181 @@ private static List<String> findPreloadFiles(String dir, String glob) {
92101 return ret ;
93102 }
94103
104+ /**
105+ * Load the mapping from event number to unix time
106+ * @param files
107+ * @return map
108+ */
109+ private TreeMap <Integer ,Integer > getEventUnixMap (List <String > files ) {
110+ Bank unix = new Bank (schemaFactory .getSchema ("RUN::unix" ));
111+ TreeMap <Integer ,Integer > m = new TreeMap <>();
112+ Event e = new Event ();
113+ for (String f : files ) {
114+ HipoReader r = new HipoReader ();
115+ r .setTags (1 );
116+ r .open (f );
117+ while (r .hasNext ()) {
118+ r .nextEvent (e );
119+ e .read (unix );
120+ int size = unix .getRows ();
121+ for (int i =0 ; i <size ; i ++) {
122+ m .put (unix .getInt ("event" ,i ), unix .getInt ("unixtime" ,i ));
123+ }
124+ }
125+ r .close ();
126+ }
127+ return m ;
128+ }
129+
95130 /**
96131 * Modify REC::Event/HEL::scaler with the delay-corrected helicity
97132 * @param event
98- * @param runConfig
99- * @param recEvent
133+ * @param runcfg
134+ * @param recevt
100135 */
101- private void processEventHelicity (DataEvent event , DataBank runConfig , DataBank recEvent ) {
102- HelicityBit hb = helicitySequence .search (runConfig .getLong ("timestamp" , 0 ));
136+ private void processEventHelicity (DataEvent event , DataBank runcfg , DataBank recevt ) {
137+ HelicityBit hb = helicitySequence .search (runcfg .getLong ("timestamp" , 0 ));
103138 HelicityBit hbraw = helicitySequence .getHalfWavePlate () ? HelicityBit .getFlipped (hb ) : hb ;
104- recEvent .setByte ("helicity" ,0 ,hb .value ());
105- recEvent .setByte ("helicityRaw" ,0 ,hbraw .value ());
139+ recevt .setByte ("helicity" ,0 ,hb .value ());
140+ recevt .setByte ("helicityRaw" ,0 ,hbraw .value ());
106141 DataBank helScaler = event .getBank ("HEL::scaler" );
107142 if (helScaler .rows ()>0 ) {
108143 event .removeBank ("HEL::scaler" );
109- Util .assignScalerHelicity (runConfig .getLong ("timestamp" ,0 ), ((HipoDataBank )helScaler ).getBank (), helicitySequence );
144+ Util .assignScalerHelicity (runcfg .getLong ("timestamp" ,0 ), ((HipoDataBank )helScaler ).getBank (), helicitySequence );
110145 event .appendBank (helScaler );
111146 }
112147 }
113148
114149 /**
115150 * Modify REC::Event/HEL::scaler with the delay-corrected helicity
116151 * @param event
117- * @param runConfig
118- * @param recEvent
152+ * @param runcfg
153+ * @param recevt
119154 */
120- private void processEventHelicity (Event event , Bank runConfig , Bank recEvent ) {
121- HelicityBit hb = helicitySequence .search (runConfig .getLong ("timestamp" , 0 ));
155+ private void processEventHelicity (Event event , Bank runcfg , Bank recevt ) {
156+ HelicityBit hb = helicitySequence .search (runcfg .getLong ("timestamp" , 0 ));
122157 HelicityBit hbraw = helicitySequence .getHalfWavePlate () ? HelicityBit .getFlipped (hb ) : hb ;
123- recEvent .setByte ("helicity" ,0 ,hb .value ());
124- recEvent .setByte ("helicityRaw" ,0 ,hbraw .value ());
158+ recevt .setByte ("helicity" ,0 ,hb .value ());
159+ recevt .setByte ("helicityRaw" ,0 ,hbraw .value ());
125160 Bank helScaler = new Bank (schemaFactory .getSchema ("HEL::scaler" ));
126161 event .read (helScaler );
127162 if (helScaler .getRows ()>0 ) {
128163 event .remove (schemaFactory .getSchema ("HEL::scaler" ));
129- Util .assignScalerHelicity (runConfig .getLong ("timestamp" ,0 ), helScaler , helicitySequence );
164+ Util .assignScalerHelicity (runcfg .getLong ("timestamp" ,0 ), helScaler , helicitySequence );
130165 event .write (helScaler );
131166 }
132167 }
133168
134169 /**
135170 * Modify REC::Event for beam charge and livetime
136- * @param runConfig
137- * @param recEvent
171+ * @param runcfg
172+ * @param recevt
138173 */
139- private void processEventScalers (DataBank runConfig , DataBank recEvent ) {
140- DaqScalers ds = chargeSequence .get (runConfig .getLong ("timestamp" , 0 ));
174+ private void processEventScalers (DataBank runcfg , DataBank recevt ) {
175+ DaqScalers ds = chargeSequence .get (runcfg .getLong ("timestamp" , 0 ));
141176 if (ds != null ) {
142- recEvent .setFloat ("beamCharge" ,0 , (float ) ds .dsc2 .getBeamChargeGated ());
143- recEvent .setDouble ("liveTime" ,0 ,ds .dsc2 .getLivetime ());
177+ recevt .setFloat ("beamCharge" ,0 , (float ) ds .dsc2 .getBeamChargeGated ());
178+ recevt .setDouble ("liveTime" ,0 ,ds .dsc2 .getLivetime ());
144179 }
145180 }
146181
147182 /**
148183 * Modify REC::Event for beam charge and livetime
149- * @param runConfig
150- * @param recEvent
184+ * @param runcfg
185+ * @param recevt
151186 */
152- private void processEventScalers (Bank runConfig , Bank recEvent ) {
153- DaqScalers ds = chargeSequence .get (runConfig .getLong ("timestamp" , 0 ));
187+ private void processEventScalers (Bank runcfg , Bank recevt ) {
188+ DaqScalers ds = chargeSequence .get (runcfg .getLong ("timestamp" , 0 ));
154189 if (ds != null ) {
155- recEvent .putFloat ("beamCharge" ,0 , (float ) ds .dsc2 .getBeamChargeGated ());
156- recEvent .putDouble ("liveTime" ,0 ,ds .dsc2 .getLivetime ());
190+ recevt .putFloat ("beamCharge" ,0 , (float ) ds .dsc2 .getBeamChargeGated ());
191+ recevt .putDouble ("liveTime" ,0 ,ds .dsc2 .getLivetime ());
192+ }
193+ }
194+
195+ /**
196+ * Modify REC::Event for beam charge and livetime
197+ * @param runcfg
198+ * @param runcfg
199+ */
200+ private void processEventUnix (Event event , Bank runcfg ) {
201+ if (runcfg .getRows () > 0 ) {
202+ Integer unix = eventUnix .get (eventUnix .floorKey (runcfg .getInt ("event" ,0 )));
203+ if (unix != null ) {
204+ event .remove (runcfg .getSchema ());
205+ runcfg .putInt ("unixtime" , 0 , unix );
206+ event .write (runcfg );
207+ }
208+ }
209+ }
210+
211+ /**
212+ * Modify REC::Event for beam charge and livetime
213+ * @param runcfg
214+ * @param runcfg
215+ */
216+ private void processEventUnix (DataEvent event , DataBank runcfg ) {
217+ if (runcfg .rows () > 0 ) {
218+ Integer unix = eventUnix .get (eventUnix .floorKey (runcfg .getInt ("event" ,0 )));
219+ if (unix != null ) {
220+ event .removeBank (runcfg .getDescriptor ().getName ());
221+ runcfg .setInt ("unixtime" , 0 , unix );
222+ event .appendBank (runcfg );
223+ }
157224 }
158225 }
159226
160227 /**
161228 * Postprocess one event
162- * @param e
229+ * @param event
163230 */
164- public void processEvent (DataEvent e ) {
165- if (!e .hasBank ("RUN::config" )) return ;
166- if (!e .hasBank ("REC::Event" )) return ;
167- DataBank runConfig = e .getBank ("RUN::config" );
168- DataBank recEvent = e .getBank ("REC::Event" );
169- if (runConfig .rows ()<1 || recEvent .rows ()<1 ) return ;
170- e .removeBank ("REC::Event" );
171- if (helicitySequence != null ) processEventHelicity (e , runConfig , recEvent );
172- if (chargeSequence != null ) processEventScalers (runConfig , recEvent );
173- e .appendBank (recEvent );
231+ public void processEvent (DataEvent event ) {
232+ if (event .hasBank ("RUN::config" )) {
233+ DataBank runcfg = event .getBank ("RUN::config" );
234+ if (runcfg .rows () > 0 ) {
235+ processEventUnix (event , runcfg );
236+ if (event .hasBank ("REC::Event" )) {
237+ DataBank recevt = event .getBank ("REC::Event" );
238+ if (recevt .rows () > 0 ) {
239+ event .removeBank ("REC::Event" );
240+ if (helicitySequence != null ) processEventHelicity (event , runcfg , recevt );
241+ if (chargeSequence != null ) processEventScalers (runcfg , recevt );
242+ event .appendBank (recevt );
243+ }
244+ }
245+ }
246+ }
174247 }
175248
176249 /**
177250 * Postprocess one event
178- * @param e
251+ * @param event
179252 */
180- public void processEvent (Event e ) {
181- if (! e . hasBank ( schemaFactory . getSchema ( "RUN::config" ))) return ;
182- if (! e . hasBank ( schemaFactory . getSchema ( "REC::Event" ))) return ;
183- Bank runConfig = new Bank ( schemaFactory . getSchema ( "RUN::config" ));
184- Bank recEvent = new Bank ( schemaFactory . getSchema ( "REC::Event" ) );
185- e . read ( runConfig );
186- e . read (recEvent );
187- if (runConfig . getRows ()< 1 || recEvent . getRows ()< 1 ) return ;
188- e . remove ( schemaFactory . getSchema ( "REC::Event" ) );
189- if ( helicitySequence != null ) processEventHelicity ( e , runConfig , recEvent );
190- if ( chargeSequence != null ) processEventScalers ( runConfig , recEvent );
191- e . write ( recEvent );
253+ public void processEvent (Event event ) {
254+ event . read ( runConfig ) ;
255+ event . read ( recEvent ) ;
256+ if ( runConfig . getRows () > 0 ) {
257+ processEventUnix ( event , runConfig );
258+ if ( recEvent . getRows () > 0 ) {
259+ event . remove (recEvent . getSchema () );
260+ if (helicitySequence != null ) processEventHelicity ( event , runConfig , recEvent ) ;
261+ if ( chargeSequence != null ) processEventScalers ( runConfig , recEvent );
262+ event . write ( recEvent );
263+ }
264+ }
192265 }
193266
194267 /**
195268 * Create rebuilt files from preload files.
196- * @param preloadFiles
269+ * @param files
197270 * @return map of rebuilt:preload files
198271 */
199- private Map <String ,String > rebuild (String dir , List <String > preloadFiles ) {
272+ private Map <String ,String > rebuild (String dir , List <String > files ) {
200273 File d = new File (dir );
201274 if (!d .canWrite ()) {
202275 throw new RuntimeException ("No write permissions on " +dir );
203276 }
204277 Map <String ,String > rebuiltFiles = new HashMap <>();
205- for (String preloadFile : preloadFiles ) {
278+ for (String preloadFile : files ) {
206279 String rebuiltFile = dir +"/" +outputPrefix +preloadFile .replace (dir +"/" ,"" );
207280 Util .rebuildScalers (conman , preloadFile , rebuiltFile );
208281 rebuiltFiles .put (rebuiltFile ,preloadFile );
0 commit comments