summaryrefslogtreecommitdiff
path: root/lib/Event/File/tail.pm
blob: b3a49a86b4ed275cc42f1c85f791e35a9d4ab0e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
=head1 NAME

  Event::File::tail - 'an tail [CB]<-f>' implementation using Event

=head1 SYNOPSIS

  use Event::File;
  Event::File->tail(
		    file => '/var/log/messages',
		    cb   => \&my_read_callback_function
		   );
  Event::loop;

=head1 DESCRIPTION

Event::FileTail is an attempt to reproduce the behaviour of the
'tail -f' using Event as the backend.

The main difference between this module and other modules that tries
to implement it is that it leaves room for parallel processing using 
Event.

=cut

package Event::File::tail;
use strict;
use vars qw($VERSION $ID);

$VERSION = "0.1.1";

#our internal id
$ID = 0;

#We use Event::File to register
'Event::File'->register;

#our startup 
sub new {
  my $class = shift;
  my %arg = @_;
  
  $ID++;

  #  No allocate (at least for now)
  #  my $o = allocate($class, delete $arg{attach_to} || {});
  my $o={
	 #We keep our "shared" data here
	 _arg => {},

	 #Here goes our internal data  stuff
	 _data => {

                   #our internal id
                   id => $ID,

		   #flag to tell if the processing file is the same as before
		   # 0 => no
		   # 1 => yes
		   file_ok => 0,

		   #internal count to keep track of how 
		   #many lines have we writen yet 
		   #our hardcoded limit is 10
		   file_count => 0,

		   #here we keep the beggining of the file for
		   #footprinting in case the file changes
		   file_footprint => [],

		   #counter to keep track of the file if it gets too quite
		   #(it might be rotated, and we didn't notice)
		   timer_count => 0,
		   
		   #Does the file exist?
		   #0 => no
		   #1 => yes
		   file_exist => 0,
		  },

	 #we keep a reference to our internal watchers
	 #remember that we have to undefine it in the 
	 #DESTROY sub
	 _watchers => {
		       timer => undef,
		       read  => undef,
		      },
	};

  bless $o;  

  #Event initioalization stuff
  #  $o->init(\%arg);

  #our initialization
  $o->_init(\%arg);

  $o->_prepare();
  $o;
}

=head2 Supported attributes

=over

=item file

file gives the file name to watch, with either a absolute or relative path.
The file has to exist during initialization.  After it, it can be unlinked and
recreated.

=item position

where to start reading from the file, in bytes.  As the file is read, it 
will be updated with the last position read.

=item footprint

footprint, if defined, ia an array reference of n elements.  Each element correspond
to a line from the beggining of the file. If any line does not match in the file,
C<position> will be ignored and started in the beggining.

=item cb

cb is the standard callback. It will receive a code reference that will
be called after every line read from the C<file>. The newline from the line
will be C<chomp>ed before passed.
The Event::File::tail object will be passed as the first argument.
The read line will be passed as the second argument.

=item timeout

A timeout starts to calculate after the file read gets to the end.
If a new line is added to the file the timer count is reseted.
Its main use is to catch a situation when the C<file> is rotated
and it was not catched.
The file will be closed and reopened.  If the file stills the same
it will continue from the place it was before closing it.  If the 
file has really changed, it will start reading it from the beggining.
If not specified it defaults to 60s.

=item timeout_cb

This is the callback to call when a timeout occur.
The timeout callback will be only called if the reopened file
results in the same file. 

=item endfile_cb

This callback will be called every time when the file read gets
to the end.  So if you need to do something after reading the
file (instead of during each read line).

=item desc

Description of the watcher.

=back

=head1 METHODS

This are the methods available for use with the tail watcher.

=cut



#our internal init method
sub _init{
  my ($me, $arg) = @_;

  die "You must provide a 'file' argument to tail\n" if (!defined($arg->{file}));
  $me->{_arg}->{file} = $arg->{file};

  #sets the position
  $me->{_arg}->{position} = 0;
  $me->{_arg}->{position} = $arg->{position} if (defined($arg->{position}));

  #the read callback
  $me->{_arg}->{cb} = sub {};
  $me->{_arg}->{cb} = $arg->{cb} if (defined($arg->{cb}));

  #timeout value
  $me->{_arg}->{timeout} = 60;
  $me->{_arg}->{timeout} = $arg->{timeout} if (defined($arg->{timeout}));

  #the timeout callback
  $me->{_arg}->{timeout_cb} = sub {};
  $me->{_arg}->{timeout_cb} = $arg->{timeout_cb} if (defined($arg->{timeout_cb}));

  #the end of file callback
  $me->{_arg}->{endfile_cb} = sub{};
  $me->{_arg}->{endfile_cb} = $arg->{endfile_cb} if (defined($arg->{endfile_cb}));

  #check for description
  $me->{_arg}->{desc} = "Event::File->tail watcher. ID: " . $me->{_data}->{id};
  $me->{_arg}->{desc} = $arg->{desc} if (defined($arg->{desc}));

  #file foot print
  $me->{_data}->{file_footprint} = [];
  $me->{_data}->{file_footprint} = $arg->{footprint} if (defined($arg->{footprint}));
}

#prepare the real watchers
sub _prepare{
  my ($me) = @_;

  #opens the file (parked)
  $me->_open_file(1);

  #checks wether the faile still the same
  $me->_file_check_sanity();

  #adjusts the file
  $me->{fd}->seek($me->{_arg}->{position}, 0);

  # The helper timer
  $me->{_watchers}->{timer} = Event->timer(
					   cb       => [$me, '_file_timer'],
					   desc     => 'helper watcher for Event::File->tail ID:' . $me->{_data}->{id},
					   interval => 1,
					   prio     => 6,
					   parked   => 1,
					  );

  #now we start the read timer
  $me->{_watchers}->{read}->start;
}

#this is the timer callback 
sub _file_read{
  my ($me, $event)=@_;
  my ($position, $handler);

  $handler = $event->w->fd;

  #There is nothing to read in the file
  if (eof($handler)){

    #saves the file position
    #Attention:
    #if using sys{read,write,seek} family,
    #would have to use 'sysseek($handler, 0, 1)' instead of tell
    $position = tell($handler);

    #if the file has changed
    if ($me->{_arg}->{position} != $position){

      $me->{_arg}->{position} = $position;
      
      #reset the counter
      $me->{_data}->{timer_count} = 0;

      #end of file callback
      &{$me->{_arg}->{endfile_cb}}($me);
    }

    #stops this watcher
    #there is nothing to read
    $event->w->stop;

    #starts the timer
    $me->{_watchers}->{timer}->again;

    return;
  }

  $_ = <$handler>;
  chomp;


  #save the first lines from the from file
  #right now 10 is a harcoded limit
  if (( $me->{_data}->{file_count} < 10) && ( $me->{_data}->{file_ok} == 0)){
    push ( @{$me->{_data}->{file_footprint}}, $_);
    $me->{_data}->{file_count}++;
  }

  #this is the default callback
  &{$me->{_arg}->{cb}}($me, $_);
  
  $me->{_data}->{timer_count} = 0;
}


#######################################
# from_file_timer:
#    timer callback
#######################
sub _file_timer{
  my ($me, $event)=@_;
  my ($pid);

  #update the counter
  $me->{_data}->{timer_count}++;

  #first we check if the file still there
  if (! -f $me->{_arg}->{file}){
    #no, the file isn't there (oh oh)
    
    if ($me->{_data}->{file_exist} == 1){
      #we probably just got the file being rotate

      #flag to tell that 
      $me->{_data}->{file_exist} = 0;
 
      #note that we keep timer on, until the file is back
      #if we were lucky, we got every entry before it was rotate
      #otherwise we probably lost the lasts entrys
      #For all effects, we behave like we never lost it (since
      # we can't guess :\ )
      
      #supose you are watching procmail's log file
      #if the user rotates the from file from the procfile,
      #he has to make sure that he rotates the file before delivering mail,
      #as procmail is called when receives a mail
      #otherwise we will lost that entry for sure. (sad!)
      #and we will only note that after that mbox is opened.

      #reset the counter
      $me->{_data}->{timer_count} = 0;

      #close the file and cancels the watcher
      $me->_close_file();
    }
    return;
  }
  
  #was it trucated/rotated and we already noticed it before?
  if ($me->{_data}->{file_exist} == 0){
    #now the file is back, we need to reopen it again
    
    #we need to get this data back..
    $me->{_data}->{file_count} = 0;
    $me->{_data}->{file_footprint} = [];
    
    #we can stop the timer for now
    $me->{_watchers}->{timer}->stop;
    
    #reset the counter
    $me->{_data}->{timer_count} = 0;
      
    #open it again
    $me->_open_file();

    return;
  }

  #checks if we got to the limit
  if ($me->{_data}->{timer_count} == $me->{_arg}->{timeout}){

    $me->{_data}->{timer_count} = 0;

    #The file might have being rotated and we didn't notice it
    $me->_close_file();
    $me->_open_file(1);

    #there is a great chance that the file is the same as before,
    #so we need to check it again    
    if ($me->_file_check_sanity() == 0){

      #reset our counters
      $me->{_data}->{file_count} = 0;
      $me->{_data}->{file_footprint} = [];
    }
    else{
      #The timout callback
      &{$me->{_arg}->{timeout_cb}}($me);
    }
    
    #we can stop the timer for now
    $me->{_watchers}->{timer}->stop;
    $me->{_watchers}->{read}->again;

    return;
  }


  #default behaviour
  
  $me->{_watchers}->{timer}->stop;
  $me->{_watchers}->{read}->again;
}




######################################
# _close_file
#     Closes the file and cancels the
#     watcher
###################################
sub _close_file{
  my ($me) = @_;

  #cancel the watcher and make sure we don't hold any reference to it
  $me->{_watchers}->{read}->cancel;
  undef $me->{_watchers}->{read};
  
  #close the FH
  $me->{fd}->close();
  undef $me->{fd};
}


###########################################
#  _open_file:
#       Open the file and start the watcher to it
###############
sub _open_file{
  my ($me, $parked) =@_;

  if (!defined($parked)){
    $parked = 0;
  }

  #open it
  $me->{fd} = new IO::File $me->{_arg}->{file}, "r"
    or die "Error!! Could not open file: ". $me->{_arg}->{file} ."\nReason: $!\n";
    
  #it exists
  $me->{_data}->{file_exist} = 1;

  #the file read watcher
  $me->{_watchers}->{read} = Event->io(
				       fd     => $me->{fd},
				       cb     => [$me, "_file_read"],
				       poll   => 'r',
				       desc   => 'Helper watcher for Event::File->tail ID:' . $me->{_data}->{id},
				       parked => $parked,
				      );
}

##################################################
#  _file_check_sanity:
#     Checks if the from file still the same 
#     as before.
#     If it is in sync, it will update the 
#     position to where we left off.
#     It returns:
#     0 => file out of sync (start from the beggining)
#     1 => file in sync
################################
sub _file_check_sanity{
  my ($me) = @_;
  my ($size, $line, $file_line, $old_position, $handler, $file_ok);
  
  $size          = (stat($me->{_arg}->{file}))[7];
  $old_position  = $me->{_arg}->{position};
  $handler       = $me->{fd};

  #Check if it is possible to be the same file
  if ( $size >= $old_position ){

    #checks each line
    #note that this will not block because of the size check above
    #if the file shrinks, we better start all over because it would be impossible
    #to know what exaclty happened
    $file_ok = 1;
  SANITY_CHECK:
    foreach $line (@{$me->{_data}->{file_footprint}}){

      #read it
      $file_line = <$handler>;
      chomp ($file_line);
      
      if ($file_line ne $line){
	#out of sync!
	$file_ok = 0;
	last SANITY_CHECK;
      }
    }
    #put the file in the desired position
    if ($file_ok){
      $handler->seek($old_position, 0);
    }else{
      $handler->seek(0, 0);
    }
  }
  
  #We don't trust a file if it is less than it was before
  else{
    $file_ok = 0;
  }

  $me->{_data}->{file_ok} = $file_ok;

  #returns the result
  return $file_ok;
}

=head2 $watcher->desc;

Returns the description of the watcher.

=cut

sub desc{
  my ($me) = @_;
  return $me->{_arg}->{desc};
}

=head2 $watcher->id;

Returns the internal watcher id number.

=cut

sub id{
  my ($me) = @_;
  return $me->{_data}->{id};
}

=head2 $position = $watcher->position;

Returns the current file position

=cut

#WARNING: the file position is updated only when it gets to the end of the
#file
sub position{
  my ($me) = @_;
  return $me->{_arg}->{position};
}

=head2 $array_ref = $watcher->footprint;

This will return an array reference of the file's footprint.
This is handy if you need to quit your application and after restarting it the
file can be checked whether it is the same or not.

=cut

sub footprint{
  my ($me) = @_;
  return $me->{_data}->{file_footprint};
}

=head2 $result = $watcher->loop($timeout);

loop is a wrapper to C<Event::loop> in case that no other Event's watcher is in use.  
You have to call it somewhere to let Event watch the
file for you.
C<$result> will return from the C<$result> value passed by an C<unloop> method (see below).
Please refer to the loop function in the Event pod page for more info.

=cut

#'loop wrapper
sub loop{
  my ($me, $timeout) = @_;
  return Event::loop($timeout);
}

=head2 $watcher->unloop($result);

A wrapper to C<Event::unloop>.  This will cancel an active Event::loop, e.g.
when called from a callback.
C<$result> will be passed to the C<loop> caller.
Please refer to Event::unloop for more info.

=cut

#unloop wrapper
sub unloop{
  my ($me, $result) = @_;
  Event::unloop($result);
}

=head2 $watcher->sweep;

A wrapper around C<Event::sweep>. 
C<sweep> will call any event pending and return.
Please refer to C<Event::sweep> for mor info.

=cut

#Sweep wrapper
sub sweep{
  my ($me, $prio) = @_;
  Event::sweep($prio);
}

=head2 $watcher->stop;

This will stop the watcher until a C<again> or C<start> method is called.

=cut

sub stop{
  my ($me) = @_;
  $me->{_watchers}->{read}->stop;
  $me->{_watchers}->{timer}->stop; 
}

=head2 $watcher->start;

This method will restart the watcher

=cut

sub start{
  my ($me) = @_;
  $me->{_watchers}->{read}->stop;
  $me->{_watchers}->{timer}->start;
}

=head2 $watcher->again;

The same as C<start>

=cut

sub again{
  my ($me) = @_;
  $me->start;
}

=head2 $watcher->cancel

This will destroy the watcher.
Note that if t there is a reference to this watcher outside this package,
the memory won't be freed.

=cut

#'
sub cancel{
  my ($me) = @_;
  $me->{_watchers}->{read}->cancel;
  $me->{_watchers}->{timer}->cancel;
  undef $me;
}


1;

__END__

=pod

=head1 loop vs sweep

When do you have to use C<loop> or C<sweep>?

Well, that depends.  If you are not familiar with Event, the quick
and dirty answer is C<loop> will BLOCK and C<sweep> no.

C<loop> will be keeping calling the callback functions whenever they are
ready and will just return when a callback calls for C<unloop> or a timeout
happens.

On the other hand, if you are not using Event for anything else in your program,
this might not be a desired situation.
C<sweep> can be called them to check if some event has happened or not.
If it has it will execute all the pending callbacks and then return (as opposed
from C<loop>). So, long loops might be a good place to use it.

=head1 IMPLEMENTATION

Event::File::tail is a fake watcher in the Event point of view.  On the other hand, it
does use two helper watchers for each Event::File::tail, a read io and a timer watchers.
In case you are debugging and need to findout about them, every tail watcher has an unique
id during the program execution (use C<$watcher->id) to retrive it).  Each helper watcher
does have the id number on its description (desc).

=head1 SEE ALSO

Event(3), Tutorial.pdf, cmc

=head1 AUTHOR

Raul Dias <raul@dias.com.br>