Go 监控文件变化
试想如下需求:我们需要监控某个日志文件,当有新的日志被写入时,读出新的部分并进行处理(如解析内容,并分类存储,发出通知)
在写项目时,刚好遇到了这个问题,所以就思考如何实现
思路其实很明确,尽管大部分高级语言已经把文件读取封装了,但是如果写过 C/C++,应该是可以知道读入文件是可以一个字符一个字符读取(在算法竞赛中,这样读取甚至可能可以提升速度),也可以通过seek()
移动到指定位置(也即可以直接跳到任意位置开始读数据)。
大概思路如下:
- 以只读方式打开文件
- 监控文件更改
- 发现文件被改动
- 从当前读入指针开始读取,直到结束
- 读入的数据就是新增的数据
使用 fsnotify 进行读取
fsnotify 是一个监控文件修改的库,可以在文件被创建、写入、删除、重命名、修改时触发通知。
package main import ( "bytes" "io" "os" "path" "github.com/OhYee/rainbow/errors" "github.com/OhYee/rainbow/log" "gopkg.in/fsnotify.v1" ) func watch(file string) { watch, err := fsnotify.NewWatcher() if err != nil { log.Error.Println(errors.ShowStack(err)) return } defer watch.Close() f, err := os.OpenFile(file, os.O_RDONLY|os.O_CREATE, os.ModePerm) if err != nil { log.Error.Println(errors.ShowStack(err)) return } defer f.Close() // 读入已有的部分 buf := bytes.NewBuffer([]byte{}) b := make([]byte, 64) for { n, err := f.Read(b) if err != nil && err != io.EOF { log.Error.Println(errors.ShowStack(err)) return } if n == 0 { break } // buf.Write(b[:n]) } buf.Reset() //添加要监控的对象,文件或文件夹 err = watch.Add(file) if err != nil { log.Error.Println(errors.ShowStack(err)) return } log.Debug.Println("start watch", file) for { select { case ev := <-watch.Events: { //判断事件发生的类型,如下5种 // Create 创建 // Write 写入 // Remove 删除 // Rename 重命名 // Chmod 修改权限 // if ev.Op&fsnotify.Create == fsnotify.Create { // log.Debug.Println("创建文件 : ", ev.Name) // } if ev.Op&fsnotify.Write == fsnotify.Write { // log.Debug.Println("写入文件 : ", ev.Name, ev.String()) for { n, err := f.Read(b) if err != nil && err != io.EOF { log.Error.Println(errors.ShowStack(err)) return } if n == 0 { break } buf.Write(b[:n]) } log.Debug.Println(buf.String()) buf.Reset() } // if ev.Op&fsnotify.Remove == fsnotify.Remove { // log.Debug.Println("删除文件 : ", ev.Name) // } // if ev.Op&fsnotify.Rename == fsnotify.Rename { // log.Debug.Println("重命名文件 : ", ev.Name) // } // if ev.Op&fsnotify.Chmod == fsnotify.Chmod { // log.Debug.Println("修改权限 : ", ev.Name) // } } case err := <-watch.Errors: { log.Debug.Println("error : ", err) return } } } } func main() { logFile := path.Join("./", "log.log") watch(logFile) for { } }
思路上,这么处理没有任何问题,但是在实际使用中就出现了问题,单次读入的数据可能不是完整的。导致这个的原因可能是读取的同时有文件正在写入,并且还未写入完整,从而导致只读入了一半。因此,我们应该确保每次都读到换行符再结束。
每次读入一个换行符
为了避免上述问题,需要确保每次读入只有是完整的一行(读到换行符)时,才将数据输出,否则结束读入,重新设置读入指针的位置到行首(原本位置)
具体代码如下
package main import ( "bufio" "io" "io/ioutil" "os" "path" "strings" "github.com/OhYee/rainbow/errors" "github.com/OhYee/rainbow/log" "gopkg.in/fsnotify.v1" ) func main() { logFile := path.Join("./", "log.log") w, old, err := New(logFile) if err != nil { log.Error.Println(errors.ShowStack(err)) os.Exit(1) } log.Info.Println(old) err = w.Watch(func(s string) { log.Info.Println(s) }) if err != nil { log.Error.Println(errors.ShowStack(err)) } defer w.Close() for { } } // Watcher 监控文件添加行 type Watcher struct { f *os.File r *bufio.Reader close chan bool } // New 初始化一个监控程序 func New(filename string) (w *Watcher, oldValue string, err error) { f, err := os.OpenFile(filename, os.O_RDONLY|os.O_CREATE, os.ModePerm) if err != nil { return } // 读入原本的内容 b, err := ioutil.ReadAll(f) if err != nil { return } oldValue = string(b) w = &Watcher{ f: f, r: bufio.NewReader(f), close: make(chan bool), } return } // Close 结束监控,关闭文件 func (w *Watcher) Close() { w.close <- true w.f.Close() } // Watch 开始进行监控 func (w *Watcher) Watch(callback func(line string)) (err error) { defer errors.Wrapper(&err) //添加要监控的对象,文件或文件夹 watch, err := fsnotify.NewWatcher() if err != nil { return } err = watch.Add(w.f.Name()) if err != nil { return } log.Debug.Println("start watch", w.f.Name()) for { select { case ev := <-watch.Events: if ev.Op&fsnotify.Write == fsnotify.Write { // 监控到文件写入 for { line, err := w.ReadLine() if err != nil && err != io.EOF { return err } if err == nil { // 正确读入一行 go callback(line) } else { // 当前是由于 EOF 结束,后面无内容,不需要继续读入下一行 break } } } case err = <-watch.Errors: return case <-w.close: return nil } } } // ReadLine 读入一行的数据 func (w *Watcher) ReadLine() (line string, err error) { pos, err := w.f.Seek(0, 1) if err != nil { return } line, err = w.r.ReadString('\n') if err == io.EOF { // 未读到换行符,读到 EOF // 放弃本次读取 // 回退到原本的位置 log.Debug.Println("读到 EOF,回退") w.f.Seek(pos, 0) return } if err != nil { // 存在其他错误 return } // 清楚末尾的 \n line = strings.TrimRight(line, "\n") return }
为了方面使用,上述功能已经在 v1.1.2 加入至github.com/OhYee/goutils/file
,可以直接下载yi'lai
package main import ( "os" "path" "github.com/OhYee/goutils/file" "github.com/OhYee/rainbow/errors" "github.com/OhYee/rainbow/log" ) func main() { logFile := path.Join("./", "log.log") w, old, err := file.New(logFile) if err != nil { log.Error.Println(errors.ShowStack(err)) os.Exit(1) } log.Info.Println(old) err = w.Watch(func(s string) { log.Info.Println(s) }) if err != nil { log.Error.Println(errors.ShowStack(err)) } defer w.Close() for { } }