Any computer system in today’s world generates a very high amount of logs or data daily. As the system grows, it is not feasible to store the debugging data into a database, as they’ve are immutable and it’s only going to be used for analytics and fault resolution purposes. So organisations tend to store it in files, which resides in local disks storage. We are going to extract logs from a .txt or .log file of size 16 GB, having millions of lines using Golang.
Let’s open the file first. We will be using standard Go os.File for any file IO.
f, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
// UPDATE: close after checking error
defer file.Close() //Do not forget to close the file
Once the file is opened, we have below two options to proceed with
Read the file line by line, it helps to reduce the strain on memory but will take more time in IO.
Read an entire file into memory at once and process the file, which will consume more memory but significantly increase the time.
As we are having file size too high, i.e 16 GB, we can’t load an entire file into memory. But the first option is also not feasible for us, as we want to process the file within seconds. But guess what, there is a third option. Voila…! Instead on loading entire file into memory we will load the file in chunks, using bufio.NewReader(), available in Go.
r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
}
Once we have the chunk, we will fork a thread, i.e Go routine, to process each chunk concurrently with other chunks. The above code would be changed to -
//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
//process each chunk concurrently
//start -> log start time, end -> log end time
ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)
wg.Done()
}()
}
wg.Wait()
}
The above code introduces two new optimizations:-
The sync.Pool is a powerful pool of instances that can be re-used to reduce the pressure on the garbage collector. We will be resuing the memory allocated to various slices. It helps us to reduce memory consumption and make our work significantly faster.
The Go Routines which helps us to process the buffer chunk concurrently which significantly increases the processing speed.
Now let’s implement the ProcessChunk function, which will process the logs lines, which are of the format
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
We will be extracting logs based on the time stamp provided at the command line.
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow
noOfThread++
}
length := len(logsSlice)
//traverse the chunk
for i := 0; i < length; i += chunkSize {
wg2.Add(1)
//process each chunk in saperate chunk
go func(s int, e int) {
for i:= s; i<e;i++{
text := logsSlice[i]
if len(text) == 0 {
continue
}
logParts := strings.SplitN(text, ",", 2)
logCreationTimeString := logParts[0]
logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
// check if log's timestamp is inbetween our desired period
if logCreationTime.After(start) && logCreationTime.Before(end) {
fmt.Println(text)
}
}
textSlice = nil
wg2.Done()
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing
}
wg2.Wait() //wait for a chunk to finish
logsSlice = nil
}
The above code is benchmarked using 16 GB of a log file. The time taken to extract the logs is around ~ 25 sec. The entire working code is given below.
You can reach out to me on ohm.patel1997@gmail.com. Any queries and improvements are most welcome.😉
Komentáře