多线程处理大量文件_.NET_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > .NET > 多线程处理大量文件

多线程处理大量文件

 2014/7/27 2:45:54  AnonWang  程序员俱乐部  我要评论(0)
  • 摘要:上周做了一个多线程处理大量文件的功能一是记录二是分享三是请博友指出不足更多的了解多线程。1.任务:将大量(大约5G)一目录下有日期规则命名的html文件按照年月日三个层次目录存放,目的是为了提高文件检索效率。2.具体实现:开启10个线程将文件拷贝到目标文件夹;不符合要求的文件拷贝到别处;记录错误信息和不符合要求的信息;循环判断状态执行完毕给出提示。3.开始设想和后来出现问题:开了10个线程处理所有文件,如果一文件已在目标文件下存在则不处理
  • 标签:文件 多线程 线程

上周做了一个多线程处理大量文件的功能 一是记录 二是分享 三是请博友指出不足 更多的了解多线程。

1.任务:将大量(大约5G)一目录下有日期规则命名的html文件按照年月日三个层次目录存放,目的是为了提高文件检索效率。

2.具体实现:开启10个线程 将文件拷贝到目标文件夹;不符合要求的文件拷贝到别处;记录错误信息和不符合要求的信息;循环判断状态 执行完毕给出提示。

3.开始设想和后来出现问题:

开了10个线程 处理所有文件,如果一文件已在目标文件下存在 则不处理,但是线程间几乎是同时进行的 这样就会报错"该文件已被另一线程占用"。这样处理是不行的 于是就改善了。让每个线程按顺序处理固定条数的文件,则每个线程不会处理处理同个文件。

4.代码实现:

 声明变量

    /    /App.config 配置路径
        //源文件路径
        string sourcePath = ConfigurationManager.AppSettings["sourcePath"];
        //目标路径
        string toPath = ConfigurationManager.AppSettings["toPath"];
        //错误文件存放路径
        string errorLogPath = "\\log";
        //文件总数
        int totalFileCount;
        //复制文件数
        private int operaFileCount;
        //未复制文件数
        int notCopyFileCount;
        //文件名称过长文件数
        int longNameFileCount;
        //线程数
        int threadCount = 10;

 

将所有文件名称进行分批处理 分页方法
        /// <summary>
        /// 将所有文件名称进行分页
        /// </summary>
        /// <param name="PageSize">每页条数</param>
        /// <param name="CurPage">当前页</param>
        /// <param name="objs">集合</param>
        /// <returns></returns>
        public static List<string> QueryByPage(int PageSize, int CurPage, List<string> objs)
        {
            return objs.Take(PageSize * CurPage).Skip(PageSize * (CurPage - 1)).ToList();
        }
      
        public class Page
        {
            public int pageIndex { get; set; }
            public int pageCount { get; set; }
            public int pageSize { get; set; }
            public List<string> list { get; set; }
        }

拷贝文件的方法

        #region DoWork方法
        private string errorFieName;
        public void DoWork(object obj)
        {
            Object locker = new object();
            Page page = (Page)obj;
            Console.WriteLine(ThreadName());
            int PageSize = page.pageSize;
            int CurPage = page.pageIndex;
            var grouList = QueryByPage(PageSize, CurPage, page.list);
            foreach (string file in grouList)
            {
                string fileName = Path.GetFileName(file);
                errorFieName = fileName;

                if (fileName != null && fileName.Length != 18)
                {

                    Console.WriteLine(fileName + "文件名称不符合要求!");
                    CopyErrorFile(fileName);
                    Write(fileName + "文件名称不符合要求!");
                    continue;
                }
                //Console.WriteLine(fileName.Length);
                try
                {
                    //截取文件名称 源文件名称规则 ABC2014200...html 意思是:2014年第200天 可以推算出年月日
                    string subYearMonth = fileName.Substring(3, 5);
                    int yearNum = 0;
                    int dayNum = 0;
                    int.TryParse(subYearMonth.Substring(0, 2), out yearNum);
                    int.TryParse(subYearMonth.Substring(2, 3), out dayNum);
                    int.TryParse("20" + yearNum, out yearNum);
                    if (yearNum < 1 || dayNum < 1)
                    {
                        Console.WriteLine(fileName + "文件名称不符合要求!");
                        CopyErrorFile(fileName);
                        //Write(fileName + "文件名称不符合要求!");
                        continue;
                    }
                    //声明日期
                    DateTime date = new DateTime();
                    date = date.AddYears(yearNum).AddYears(-1);
                    date = date.AddDays(dayNum).AddDays(-1);
                    string fullSavePath = string.Format("{0}\\{1}\\{2}\\{3}\\", toPath, yearNum, date.Month, date.Day);
                    if (!Directory.Exists(fullSavePath))
                    {
                        Directory.CreateDirectory(fullSavePath);
                    }
                    lock (fullSavePath)
                    {

                        File.Copy(file, fullSavePath + fileName, true);
                        operaFileCount++;
                        //Write("处理完成:" + fileName);
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine("文件名称:" + errorFieName + "处理错误:" + ex.Message);
                    Write(ex.Message);
                    throw new Exception(ex.Message);
                }
            }
        }
        #endregion    

循环执行线程

        public void CopyFile()
        {
            //开始方法时删除上次记录
            DeleteLog();
            List<Thread> threadList = new List<Thread>(); ;
            for (int i = 0; i < threadCount; i++)
            {
                try
                {
                    if (!Directory.Exists(toPath))
                    {
                        Directory.CreateDirectory(toPath);
                    }
                    //string[] fileNames = Directory.GetFileSystemEntries(sourcePath);
                    string[] fileNames = Directory.GetFiles(sourcePath);
                    var fileNameList = fileNames.ToList();
                    totalFileCount=fileNames.Length;
                   
                    //共threadCount个线程 每个线程执行oneThreadNameCount条
                    int oneThreadNameCount = (int)Math.Ceiling(((double)fileNames.Length) / threadCount);
                    Page page;
                    //当前执行的线程
                    page = new Page { pageIndex = i + 1, pageSize = oneThreadNameCount, list = fileNameList };
                    threadList.Add(new Thread(new ParameterizedThreadStart(DoWork)));
                    threadList[i].Start(page);
                    threadList[i].Name = i + "_thread";
                }

                catch (Exception ex)
                {
                    Console.WriteLine("错误信息:" + ex.Message);
                    Write(ex.Message);
                }
            }
            //判断线程执行情况
            bool isRanning;
            bool isComplete = false;
            while (!isComplete)
            {
                //2秒判断一次
                Thread.Sleep(2000);
                isRanning = false;
                foreach (var item in threadList)
                {
                    if (item.ThreadState == ThreadState.Running)
                    {
                        isRanning = true;
                    }
                }
                if (!isRanning) 
                { 
                    isComplete = true;
                    Console.WriteLine();
                    Console.WriteLine("处理结果 共" + totalFileCount+";已处理:"+operaFileCount);
                    Console.WriteLine("不符合要求:" + notCopyFileCount + "(已拷贝到errorfile文件夹);无法拷贝名称过长文件:" + longNameFileCount);
                    Console.WriteLine("请查看文件夹" + toPath);
                }
            }
            Console.ReadKey();
        }

辅助方法

        #region copy不符合要求文件
        private void CopyErrorFile(string fileName)
        {
            //实际长度超过222报错,并且无法抛出异常
            if (fileName.Length > 180)
            {
                longNameFileCount += 1;
                Write(fileName + "名称过长!");
                return;
            }
            notCopyFileCount += 1;
            string strErrorPath  =toPath+"\\errorfile";
            if(!Directory.Exists(strErrorPath)) Directory.CreateDirectory(strErrorPath);
            File.Copy(sourcePath+"\\"+fileName, toPath+"\\errorfile\\" + fileName, true);
        }
        #endregion

        private void DeleteLog()
        {
            try
            {
                if (!Directory.Exists(toPath + errorLogPath)) return;
                foreach (string var in Directory.GetFiles(toPath + errorLogPath))
                {
                    File.Delete(var);
                }
            }
            catch
            {
                //Directory.CreateDirectory(toPath + errorLogPath);
            }

        }
        private void Write(string message)
        {
            object obj = new object();
            lock (obj)
            {
                try
                {
                    string logPath=toPath+errorLogPath;
                    if (!Directory.Exists(logPath)) Directory.CreateDirectory(logPath);
                    Thread primaryThread = Thread.CurrentThread;
                    //当前线程名称
                    string threadName = primaryThread.Name;
                    using (TextWriter sw = new StreamWriter(logPath+"\\"+ThreadName()+"_error.txt", true))
                    {
                        sw.WriteLine("错误信息:" + message);
                        sw.Dispose();
                    }
                   
                }
                catch (Exception ex)
                {
                    Console.WriteLine("错误信息:" + ex.Message);
                }
            }


        }

 

发表评论
用户名: 匿名