Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Tuesday, August 08, 2017

Analyzing/Parsing syslogs using Hive and Presto


Scenario


My company asked me to provide the solution for syslog aggregation for all the environments so that they may be able to analyze and get insights. Logs should be captured first, then retained and finally processed by the analyst team in a way they already use to query/process with database. The requirements are not much clearer as well as volume of data can't be determined at the stage.


Solution


As the volume of data and the lack of clear requirements make using a data warehouse not the best fit, naturally we came to Hadoop HDFS for storage using Hive and Presto (environment already setup in the company). Although we have Hive on TEZ for processing  but we opted Presto for even better results. Below is the high level architecture for the solution. Flume agent will be used to receive the syslog events from the rsyslog client machines and will be stored in HDFS location. A hive external partitioned table will be created based on this HDFS location. Further a Presto view will be created based on this partitioned table and queried as per the analyst requirement.




Assumptions

1- Flume agent is already configured and running under HDP, for details please see my other posts as below.

Streaming Twitter Data using Apache Flume
Forward syslog to flume with rsyslog
Installing/Configuring Hortonworks Data Platform [HDP]

Below Flume agent configuration were used in HDP

# Naming the components on the current agent. 
Logagent2.sources=SourceSyslog2
Logagent2.channels=ChannelMem2
Logagent2.sinks=SinkHDFS2

# Describing/Configuring the source 
#agent.sources.SourceSyslog.type=syslogudp
Logagent2.sources.SourceSyslog2.type=syslogtcp
Logagent2.sources.SourceSyslog2.host=0.0.0.0
Logagent2.sources.SourceSyslog2.port=12346
Logagent2.sources.SourceSyslog2.keepFields=true


# Describing/Configuring the channel 
Logagent2.channels.ChannelMem2.type=memory
Logagent2.channels.ChannelMem2.capacity = 1000000 
Logagent2.channels.ChannelMem2.transactionCapacity = 100000

# Describing/Configuring the sink   
Logagent2.sinks.SinkHDFS2.type=hdfs
Logagent2.sinks.SinkHDFS2.hdfs.path = /data/flume/syslogs2/%Y/%m/
Logagent2.sinks.SinkHDFS2.hdfs.fileType = DataStream 
Logagent2.sinks.SinkHDFS2.hdfs.writeFormat = Text 
Logagent2.sinks.SinkHDFS2.hdfs.batchSize = 1000
Logagent2.sinks.SinkHDFS2.hdfs.rollSize = 0 
Logagent2.sinks.SinkHDFS2.hdfs.rollCount = 10000 
Logagent2.sinks.SinkHDFS2.hdfs.filePrefix = syslog


# Binding the source and sink to the channel 
Logagent2.sources.SourceSyslog2.channels = ChannelMem2
Logagent2.sinks.SinkHDFS2.channel = ChannelMem2


You need to update the /etc/rsyslog.conf (for Flume Agent host and Port) and restart rsyslog daemon eg; service rsyslog restart on your rsyslog client machines

2- Hive is already configured and running, for details please see my other post as below.


Hive Installation and Configuration

3- Presto Cluster is already configured and running, for details please see my other posts as below.
Building Teradata Presto Cluster



Sample Data

Below is the sample syslog data

<13>Jul 27 16:32:06 en01 root: Test-3 from Edge Node
<30>Jul 27 16:35:27 en01 systemd: Starting Cleanup of Temporary Directories...
<30>Jul 27 16:35:27 en01 systemd: Started Cleanup of Temporary Directories.
<30>Jul 27 16:40:01 en01 systemd: Started Session 4665 of user root.
<30>Jul 27 16:40:01 en01 systemd: Starting Session 4665 of user root.
<30>Aug  2 16:01:01 en01 systemd: Starting Session 5754 of user root.

Observe two spaces in Aug 2 record, it will have affect on your parsing.

Structure for HDFS
After reviewing the syslog data, following structure was proposed to store the files generated by Flume agent.

Month   = Jul
Day     = 16
Time    = 16:32:06
Node    = en01
Process = root: OR systemd
Log msg = Test-3 from Edge Node

Directory structure

data
    flume
        syslogs2
              year                 month
                    <<messages here>>


Create the folder in HDFS to have external table's data

[hdfs@nn01 ~]$ hdfs dfs -mkdir /data/flume/syslogs2


Create external table where logs are stored

CREATE EXTERNAL TABLE flume.syslogs2(
logline STRING
) PARTITIONED BY(year int, month int)
stored as textfile;

hive> CREATE EXTERNAL TABLE flume.syslogs2(
    > logline STRING
    > ) PARTITIONED BY(year int, month int)
    > stored as textfile;
OK
Time taken: 0.158 seconds


Create the partition in Hive

Alter table flume.syslogs2 Add IF NOT EXISTS partition(year=2017, month=08)
location '/data/flume/syslogs2/2017/08';

Alter table flume.syslogs2 Add IF NOT EXISTS partition(year=2017, month=09)
location '/data/flume/syslogs2/2017/09';


hive> SHOW PARTITIONS syslogs2;
OK
year=2017/month=8
Time taken: 0.172 seconds, Fetched: 1 row(s)


Alter table flume.syslogs2 drop partition (year=2017,month=09)



Create view in Presto parsing the syslog message/event line

create or replace view presto_vw_syslogs2 as
select year,month,
 substr(logline,2,2) Code
,substr(logline,5,3) Mnth
,cast(trim(substr(logline,9,2)) as integer) Day
,substr(logline,12,8) Time
,split(substr(logline,21),' ')[1] Host
,split(replace(split(substr(logline,21),' ')[2],':'),'[')[1] Process
,replace(split_part(replace(split(substr(logline,21),' ')[2],':'),'[',2),']') ProcessID
,split(ltrim(substr(logline,8)),' ',6)[6] Event
from flume.syslogs2

presto:flume> create or replace view presto_vw_syslogs2 as
           -> select year,month,
           ->  substr(logline,2,2) Code
           -> ,substr(logline,5,3) Mnth
           -> ,cast(trim(substr(logline,9,2)) as integer) Day
           -> ,substr(logline,12,8) Time
           -> ,split(substr(logline,21),' ')[1] Host
           -> ,split(replace(split(substr(logline,21),' ')[2],':'),'[')[1] Process
           -> ,replace(split_part(replace(split(substr(logline,21),' ')[2],':'),'[',2),']') ProcessID
           -> ,split(ltrim(substr(logline,8)),' ',6)[6] Event
           -> from flume.syslogs2;
CREATE VIEW



presto:flume> select year,month,count(*) from presto_vw_syslogs2 group by year,month;
 year | month | _col2
------+-------+-------
 2017 |     8 |    16
(1 row)



You can use regular expression also for your view query. Presto uses java pattern syntax.

create or replace view presto_vw_syslogs_regex as
select year,month
,regexp_extract(logline, '\d+') Code
,regexp_extract(logline, '[a-zA-Z]+') Month_eng
,regexp_extract(logline, '\p{Blank}\d+') Day
,regexp_extract(logline, '\d+:\d+:\d+') Time
,regexp_extract(logline, '\w+-\w+-\w+-\w+') Host
,replace(regexp_extract(logline, '\D+\[|\D+:'),'[','') Process
,replace(regexp_extract(logline, '\d+]'),']','') ProcessID
,trim(replace(substr(logline,strpos(logline,regexp_extract(logline, ':\p{Blank}')) ),':','')) Event
,logline from flume.text_syslogs_current


The same can be done for Hive also , example given below.

create or replace view flume.hive_vw_syslogs_current_regex as
select year,month 
,regexp_extract(logline, '[0-9]+',0) Code
,regexp_extract(logline, '[a-zA-Z]+',0)  Month_eng   
,regexp_extract(logline, '\\s\\d\\d',0) Day
,regexp_extract(logline, '\\d*:\\d*:\\d*',0) Time
,regexp_extract(logline, '\\w{3}-\\w{3}-\\w{2}-\\w{4}',0) Host
,translate(regexp_extract(logline, '[a-zA-Z]+\\[|[a-zA-Z]+:',0) ,'[','')Process
,translate(regexp_extract(logline, '\\d*]',0),']','') ProcessID
--,regexp_extract(logline, ':\\s',0) Event
,translate( substring(logline,instr(logline,regexp_extract(logline, ':\\s',0) ) ),': ','')Event
--,logline 
from flume.text_syslogs_current t


Test few more queries

-- Year and Month specific result
SELECT * FROM flume.presto_vw_syslogs2
where year=2017 and month=08
order by time desc

-- Last 200 occurrences
SELECT * FROM flume.presto_vw_syslogs2
where year=2017 and month=08 and day = 7
order by time desc
limit 200

-- Group by Process
select process,count(1) tot_events
from flume.presto_vw_syslogs2 l
group by Process

-- Group by Code
select  code,count(1) tot_events from flume.presto_vw_syslogs2 l
group by code
order by tot_events desc

-- By Date
select *  from flume.presto_vw_syslogs2 l
where year = year(current_date) and month = month(current_date) and day =day(current_date)-1

--By Event
select event,count(1)
from flume.presto_vw_syslogs2 l
group by event


We used  Presto functions for parsing, small description is given for reference.

split(string, delimiter, limit) → array<varchar>
Splits string on delimiter and returns an array of size at most limit. The last element in the array always contain everything left in the string. limit must be a positive number.

split_part(string, delimiter, index) → varchar
Splits string on delimiter and returns the field index. Field indexes start with 1. If the index is larger than than the number of fields, then null is returned.

replace(string, search) → varchar
Removes all instances of search from string.

replace(string, search, replace) → varchar
Replaces all instances of search with replace in string.

substr(string, start) → varchar
Returns the rest of string from the starting position start. Positions start with 1. A negative starting position is interpreted as being relative to the end of the string.

substr(string, start, length) → varcharReturns a substring from string of length length from the starting position start. Positions start with 1. A negative starting position is interpreted as being relative to the end of the string.

No comments: