public class RowMutation { public string Key { get; set; } public IList<CellMutation> Mutations { get; set; } public RowMutation() { } public RowMutation(string key) { Key = key; } public RowMutation(string key, IList<CellMutation> mutations) { Key = key; Mutations = mutations; } } public class CellMutation { public string ColumnName { get; set; } public string DataValue { get; set; } public CellMutation() { } public CellMutation(string name) { ColumnName = name; } public CellMutation(string name, string value) { ColumnName = name; DataValue = value; } }View Code
2.引用aquiles命名空间。
using Aquiles.Helpers.Encoders; using Aquiles.Cassandra10; using Aquiles.Core.Cluster;
3.初始化成员变量
private string _clusterName { get; set; } private ConsistencyLevel _consistencyLevel { get; set; } private string _keyspaceName { get; set; } private ICluster _cluster = null; public AquilesDemo() { _clusterName = "xxxx"; _consistencyLevel = ConsistencyLevel.LOCAL_QUORUM; _keyspaceName = "xxxx"; _cluster = AquilesHelper.RetrieveCluster(_clusterName); }
4.建立一个通用的修改多行多列的方法,其它改变方式可以在此基础上完成调用,如修改一多一列,一行多列等。
public void Mutate(string columnFamily, IList<RowMutation> rowMutations) { if (string.IsNullOrWhiteSpace(columnFamily)) throw new ArgumentNullException("columnFamily"); rowMutations = PrepareMutationList(rowMutations); Dictionary<byte[], Dictionary<string, List<Apache.Cassandra.Mutation>>> mutation_map = new Dictionary<byte[], Dictionary<string, List<Apache.Cassandra.Mutation>>>(); foreach (var rowMutation in rowMutations) { byte[] key = ByteEncoderHelper.UTF8Encoder.ToByteArray(rowMutation.Key); Dictionary<string, List<Apache.Cassandra.Mutation>> cfMutation = new Dictionary<string, List<Apache.Cassandra.Mutation>>(); List<Apache.Cassandra.Mutation> mutationList = new List<Apache.Cassandra.Mutation>(); foreach (CellMutation cellMutation in rowMutation.Mutations) { if (cellMutation.DataValue == null) continue; Apache.Cassandra.Mutation mutation = new Apache.Cassandra.Mutation() { Column_or_supercolumn = new ColumnOrSuperColumn() { Column = new Column() { Name = ByteEncoderHelper.UTF8Encoder.ToByteArray(cellMutation.ColumnName), Timestamp = DateTime.Now.ToUnixTimestamp(), Value = ByteEncoderHelper.UTF8Encoder.ToByteArray(cellMutation.DataValue), }, }, }; mutationList.Add(mutation); } if (mutationList.Count > 0) { cfMutation.Add(columnFamily, mutationList); mutation_map.Add(key, cfMutation); } } if (mutation_map.Count == 0) return; _cluster.Execute(new ExecutionBlock(delegate(Apache.Cassandra.Cassandra.Client client) { client.batch_mutate(mutation_map, _consistencyLevel); return null; }), _keyspaceName); }
5. 由于调用端可能会将同一行的修改分为多个RowMutation传入,所以上面的代码使用PrepareMutationList方法根据RowKey作了聚合处理。
/// <summary> /// 聚合列表中相同rowkey的项为同一Mutation。 /// </summary> /// <param name="rowMutations"></param> /// <returns></returns> private IList<RowMutation> PrepareMutationList(IList<RowMutation> rowMutations) { if (rowMutations == null) return null; //按rowkey分组 var rowMutationGroups = rowMutations.GroupBy(rm => rm.Key); //分组后的总数量相同,则认为rowkey在列表中是唯一的,不用做聚合处理 if (rowMutations.Count == rowMutationGroups.Count()) return rowMutations; //遍历分组结果 IList<RowMutation> result = new List<RowMutation>(); foreach (var rmg in rowMutationGroups) { RowMutation rowMutation = new RowMutation(); rowMutation.Key = rmg.Key; //将同一分组的所有CellMutation放在一个列表中 List<CellMutation> cellMutations = new List<CellMutation>(); foreach (var rm in rmg) { cellMutations.AddRange(rm.Mutations); } if (cellMutations.Count() > 0) { rowMutation.Mutations = cellMutations; result.Add(rowMutation); } } return result; }