How To Make a Log Monitor

Monitoring on log files is useful for overseeing application status and allows operators take neccessary actions to prevent incidents. However, sometimes it is hard to find a simple ready-to-use monitor for our specific use case. Here is a tutorial on how to build a very basic monitor on HTTP access logs using a few lines of Python codes. HTTP Access logs looks like the example below:

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08 [en] (Win98; I ;Nav)"

Overview

This monitor consists of 3 components:

  1. File Watcher
  2. Analyser/Aggregator
  3. User Interface

The File Watchers read new lines from monitored files, parse the log line and put the extracted information as log items into a message queue dedicated to log influx.

The Analyzer consume log items and update statistic metrics. It uses a memory segment shared with User Interface. When Analyzer finds out some critial metrics are above/below a certain threshold, the Analyzer send an alert item into the alert message queue. Which will be consumed by the User Interface.

The User Interface present the metrics to user and show up alerts when it rises. This program uses a text based UI in the console, periodically update the widgets with statistical information fetched from the shared memory. If there is message in the alert queue, the alert status will change accordingly and shows alert messages in the UI.

Each component uses a process dedicated to its own job. Information flows from File Watcher to Analyzer, then from Analyzer to UI.

Architecture Schema

When starting the program, the main routine

  1. initialize message queues, shared memories
  2. spawn processes of file watcher, analyser and UI
  3. wait for processes terminate

Here is the launching scripts:

if __name__ == "__main__":
    ....
    monitor = Monitor(log_files, threshold_aps)
    monitor.initialize()
    monitor.start()
    monitor.wait_for_finish()

Now let's make the 3 components to monitor our log files, starting from File Watcher.

File Watcher

When creating watcher process, we pass the file to watch and the message queue to which it should send parsed metrics.

class Monitor(object):
    ....
    def initialize(self):
        # watch files
        for filename in self._filenames:
            fw = FileWatcher(filename)
            proc = Process(target=fw.watch, args=(self._log_q, self._running))
            self._processes.append(proc)
    ....

When file watcher is running, it reads new lines from wathced files, parse them into metrics and send them to message queue. One thing worth our attention is that when it has read the last line, next readline() will return an empty line. In this case we let it sleep a while and try another read later.

class FileWatcher(object):
    def __init__(self, filename, timeout=DEFAULT_READLINE_TIMEOUT):
        try:
            self._file_handle = open(filename, 'r')
            self._file_handle.seek(0, os.SEEK_END)
        except FileNotFoundError as err:
            print(err)
            raise err

    def readline(self):
        line = None
        sleep_count = 0
        while not line:
            line = self._file_handle.readline()
            if line == '':
                time.sleep(DEFAULT_READLINE_SLEEP)

        return line

    def watch(self, log_queue, running):
        parser = LogParser()
        while running.value == 1:
            line = self.readline()
            if line == '':
                continue
            remotehost, rfc931, authuser, date, request, status, size = parser.parse_line(
                line)
            if not request:
                continue
            section = parser.section_from_request(request)
            try:
                size = int(size)
            except ValueError:
                size = 0
            log_item = LogItem(remotehost, rfc931, authuser,
                               date, request, status, size, section)
            log_queue.put(log_item)

Now we have an influx of information, time to aggregate them into meaningful metrics.

Analyser/Aggregator

The Analyser takes individual metrics from the messages queue, aggregate them into metrics that we care and update the memory shared with UI process. Below is a snippet from the aggregate routine showing how the hit count is updated:

  1. aggregator receives a log metric object from the message que. This metric object is generate from an access, meaning we have a hit on a webpage.
  2. we increase the hit count
  3. if now is the time to update metrics, we update the shared memory self._aggregated_statistics using the new result.
    def aggregate(self):
        ....
        next_aggregate_time = datetime.now() + timedelta(seconds=self._frame_interval)
        self._aggregated_statistics['next_aggregate_time'] = next_aggregate_time
        while self._running.value == 1:
            try:
                log_item = self._log_q.get(timeout=LOG_QUEUE_TIMEOUT)
                frame_hit_count = frame_hit_count + 1
                ....
            except queue.Empty as err:
                log_item = None

            # aggregate results of frame
            if datetime.now() > next_aggregate_time:
                lps = 1.0 * frame_hit_count / self._frame_interval
                self._aggregated_statistics['lps_frame'] = lps
                ....

                # new frame
                frame_hit_count = 0
                frame_heat_map = dict()
                next_aggregate_time = datetime.now() + timedelta(seconds=self._frame_interval)
                self._aggregated_statistics['next_aggregate_time'] = next_aggregate_time

Having the aggregated metrics, now we can present them to operators thorugh the User Interface.

User Interface

The User Interface get metrics from the shared memory and messages queues populated by the Analyser/Aggregator process. The final presentation can be a web page or simple terminal outputs. Here I use npyscreen package to build a Text UI, handy when using SSH terminal interface.

A simple way to use npyscreen is by extending the npyscreen.NPSAppManaged class. Here we pass the shared memory stats_dict and the message queue receiving alerts in its __init__ function.

The main interface is a Form Dashboard class containing the widgets to show various metrics, added by the function onStart.

class MonitorUI(npyscreen.NPSAppManaged):
    def __init__(self, stats_dict={}, alert_q=None, running=None):
        super().__init__()
        self._stats_dict = stats_dict
        self._running = running
        self._alert_q = alert_q

    def onStart(self):
        self.addForm('MAIN', Dashboard, name='HTTP Log Monitor',
                     stats=self._stats_dict, alert_q=self._alert_q)
    ...

The Dashboard class mainly has to do 2 things: place widget at desired place (create) and update the values to display (while_waiting). After updating widgets' values in the function while_waiting, do not forget to explicitly calls its display() method to render the widget.

class Dashboard(npyscreen.Form):
    ....

    def create(self):
        self.keypress_timeout = 10

        # press Escape to quit
        self.how_exited_handers[npyscreen.wgwidget.EXITED_ESCAPE] = self.exit_application

        height, width = self.useable_space()

        height_lps_status_box = 5
        rely = 2
        self._lps_box = self.add(TrafficBox, name='Traffic', editable=False,
                                 relx=2,
                                 rely=rely,
                                 max_width=(width // 2 - 4),
                                 max_height=height_lps_status_box)
        self._lps_box.set_values(val_10s=0, val_2m=0, val_lifetime=0)

        ....

    def while_waiting(self):
        self._lps_box.set_values(
            val_10s=self._stats.get('lps_frame', 0),
            val_2m=self._stats.get('lps_scene', 0),
            val_lifetime=self._stats.get('lps_lifetime', 0)
        )
        self._lps_box.display()

Launch

Now let's put these components together and launch the monitor. We use the multiprocessing library in Python to run these 3 kinds of processes simultanously.

  1. create shared resources such as shared memory and message queue using Manager and Queue
  2. create process context using entry function and arguments Process(target=fw.watch, args=(self._log_q, self._running))
  3. start the process process.start()
  4. let the parent process wait for children's termination for proc in self._processes: proc.join()
from multiprocessing import Process, Queue, Value, Manager
from file_watcher import FileWatcher
from user_interface import MonitorUI
....

class Monitor(object):

    def __init__(self, filenames, threshold_lps,
                 frame_interval=REFRESH_INTERVAL,
                 scene_interval=ALERT_WINDOW):
        self._filenames = filenames
        self._processes = list()
        self._resource_manager = Manager()
        self._log_q = Queue()
        self._alert_q = Queue()
        self._running = Value('b', 1)
        self._alert_threshold = Value('L', threshold_lps)
        self._aggregated_statistics = self._resource_manager.dict()
        self._ui = MonitorUI(self._aggregated_statistics,
                             self._alert_q, self._running)
        ....

    def initialize(self):
        # watch files
        for filename in self._filenames:
            fw = FileWatcher(filename)
            proc = Process(target=fw.watch, args=(self._log_q, self._running))
            self._processes.append(proc)
        # aggregate statistics
        proc = Process(target=self.aggregate)
        self._processes.append(proc)
        # UI
        proc = Process(target=self._ui.run)
        self._processes.append(proc)

    def start(self):
        self._aggregated_statistics['start_time'] = datetime.now()
        self._aggregated_statistics['next_aggregate_time'] = datetime.now()
        for process in self._processes:
            process.start()

    def stop(self):
        self._running.value = 0

    def wait_for_finish(self):
        for proc in self._processes:
            proc.join()

    def aggregate(self):
        ....

if __name__ == "__main__":
    ....
    monitor = Monitor(log_files, threshold_aps)
    monitor.initialize()
    monitor.start()
    monitor.wait_for_finish()

Sliding Window

Many metrics are calculated using a sliding window over a subset of most recent N minutes or last N samples. This has many implementations. Here is one of them, using a circular buffer, a metaphor of frame and scene.

Imagine that the Analyzer is required to calculate average traffic during a sliding time window of 2 minutes and the average traffic for last 10 seconds. This is quite similar to filming a video. We can consider the statistics collected every 10 seconds as a frame and the sliding time windows of 2 minutes as a scene. A scene is composed of multiple frames in a movie, so is the 2 minute statistics composed as an aggregation of 12 times of statistics over 10 seconds.

Thus we use a circular buffer to keep every frame containing 10s of statistics in the scene of 2 minutes. Thus it contains 12 items in the buffer (2min / 10s = 12). For calculating average traffic over 2 minutes, we just need to sum up the access counter of each item in the buffer and then divided by duration of 2 minutes. Any metric can be calculated this way, as long as it is an aggregation function.

Here is the code calculating traffic for last 10 seconds, 2 minutes and since the beginning.

    def aggregate(self):
        total_hit_count = 0
        frame_hit_count = 0

        # circular buffer for hit counter in alert window
        frames_in_scene_hit_counts = [0 for _ in range(self._frames_per_scene)]
        frame_index_in_scene = 0
        start_time = self._aggregated_statistics['start_time']
        alter_start_time = start_time + timedelta(seconds=self._scene_interval)
        self._aggregated_statistics['lps_frame'] = 0
        self._aggregated_statistics['lps_scene'] = 0
        self._aggregated_statistics['lps_lifetime'] = 0

        next_aggregate_time = datetime.now() + timedelta(seconds=self._frame_interval)
        while self._running.value == 1:
            try:
                log_item = self._log_q.get(timeout=LOG_QUEUE_TIMEOUT)
                frame_hit_count = frame_hit_count + 1
                hit_count = frame_heat_map.get(log_item.section, 0)
            except queue.Empty as err:
                log_item = None

            # aggregate results of frame
            if datetime.now() > next_aggregate_time:
                lps = 1.0 * frame_hit_count / self._frame_interval
                self._aggregated_statistics['lps_frame'] = lps
                total_hit_count = total_hit_count + frame_hit_count

                time_delta = datetime.now() - start_time
                total_lps = total_hit_count / time_delta.seconds
                self._aggregated_statistics['lps_lifetime'] = total_lps

                frames_in_scene_hit_counts[frame_index_in_scene] = frame_hit_count
                frame_index_in_scene = (
                    frame_index_in_scene + 1) % self._frames_per_scene
                if datetime.now() > alter_start_time:
                    scene_lps = sum(frames_in_scene_hit_counts) * \
                        1.0 / self._scene_interval
                else:
                    scene_lps = total_lps
                self._aggregated_statistics['lps_scene'] = scene_lps

                # new frame
                frame_hit_count = 0
                next_aggregate_time = datetime.now() + timedelta(seconds=self._frame_interval)

References