CmdReadCommonTable.cs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. using ExcelDataReader;
  2. using System.Collections.Concurrent;
  3. using System.Data;
  4. namespace etoy
  5. {
  6. class CmdReadCommonTable : Command
  7. {
  8. const string SEARCH_PATTERN = "*.csv";
  9. ConcurrentQueue<DataTableWrapper> _waitingParses = new ConcurrentQueue<DataTableWrapper>();
  10. ConcurrentQueue<Table> _parseDones = new ConcurrentQueue<Table>();
  11. ExcelReaderConfiguration _excelReaderConfiguration;
  12. ConcurrentQueue<ErrorData> _errorConfigs = new ConcurrentQueue<ErrorData>();
  13. CommonTableCsvParser _parser = new CommonTableCsvParser();
  14. object _locker = new object();
  15. int _totalCount;
  16. public int TotalCount
  17. {
  18. get { lock (_locker) return _totalCount; }
  19. set { lock (_locker) _totalCount = value; }
  20. }
  21. public override string Description => "读取Common表";
  22. protected override void OnProcess()
  23. {
  24. var csvs = Directory.GetFiles(Context.Option.CsvOutput, SEARCH_PATTERN, SearchOption.TopDirectoryOnly);
  25. if (csvs.Length <= 0)
  26. throw new Exception($"{Context.Option.CsvOutput} 没有*.csv文件...");
  27. TotalCount = csvs.Length;
  28. _excelReaderConfiguration = new ExcelReaderConfiguration() { AutodetectSeparators = new char[] { ',' } };
  29. _parseDones.Clear();
  30. _waitingParses.Clear();
  31. foreach (var file in csvs)
  32. Task.Factory.StartNew(OnThreadIO, file.ToPath());
  33. Task.Factory.StartNew(OnThreadParse);
  34. }
  35. void OnThreadParse()
  36. {
  37. while (_parseDones.Count < TotalCount)
  38. {
  39. while (_waitingParses.TryDequeue(out var wrapper))
  40. {
  41. try
  42. {
  43. var table = _parser.Parse(wrapper.Table, wrapper.Path);
  44. _parseDones.Enqueue(table);
  45. SetProgress(_parseDones.Count / (float)TotalCount);
  46. }
  47. catch (Exception e)
  48. {
  49. TotalCount--;
  50. _errorConfigs.Enqueue(new ErrorData() { Exception = e, Path = wrapper.Path });
  51. }
  52. }
  53. Thread.Sleep(5);
  54. }
  55. if (_errorConfigs.Count > 0)
  56. {
  57. SetException(new Exception(string.Join('\n', (from e in _errorConfigs select $"## ERROR: {e}").ToArray())));
  58. }
  59. else
  60. {
  61. while (_parseDones.TryDequeue(out var table))
  62. Context.Blackboard.Tables.Add(table);
  63. // 排序一下
  64. Context.Blackboard.Tables.Sort((a, b) => a.Name.CompareTo(b.Name));
  65. Completed();
  66. }
  67. }
  68. void OnThreadIO(object state)
  69. {
  70. try
  71. {
  72. ReadSingle(state as string);
  73. }
  74. catch (Exception e)
  75. {
  76. TotalCount--;
  77. _errorConfigs.Enqueue(new ErrorData() { Exception = e, Path = state as string });
  78. }
  79. }
  80. void ReadSingle(string file)
  81. {
  82. using Stream csv = new FileStream(file, FileMode.Open, FileAccess.Read);
  83. IExcelDataReader reader = ExcelReaderFactory.CreateCsvReader(csv, _excelReaderConfiguration);
  84. DataSet dataSet = reader.AsDataSet();
  85. foreach (DataTable dt in dataSet.Tables)
  86. {
  87. _waitingParses.Enqueue(new DataTableWrapper()
  88. {
  89. Table = dt,
  90. Path = file,
  91. });
  92. }
  93. csv.Close();
  94. reader.Close();
  95. reader.Dispose();
  96. }
  97. struct DataTableWrapper
  98. {
  99. public DataTable Table;
  100. public string Path;
  101. }
  102. struct ErrorData
  103. {
  104. public Exception Exception;
  105. public string Path;
  106. public override string ToString()
  107. {
  108. return $"{Exception?.Message} {Path}";
  109. }
  110. }
  111. }
  112. }