-
Notifications
You must be signed in to change notification settings - Fork 463
/
DeviceClientMessageConverter.cs
126 lines (104 loc) · 5.71 KB
/
DeviceClientMessageConverter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy
{
using System;
using System.Collections.Generic;
using System.Globalization;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Util;
public class DeviceClientMessageConverter : IMessageConverter<Message>
{
// Same Value as IotHub
static readonly TimeSpan ClockSkewAdjustment = TimeSpan.FromSeconds(30);
public Message FromMessage(IMessage inputMessage)
{
Preconditions.CheckNotNull(inputMessage, nameof(inputMessage));
Preconditions.CheckArgument(inputMessage.Body != null, "IMessage.Body should not be null");
var message = new Message(inputMessage.Body);
if (inputMessage.Properties != null)
{
foreach (KeyValuePair<string, string> inputMessageProperty in inputMessage.Properties)
{
message.Properties.Add(inputMessageProperty);
}
}
if (inputMessage.SystemProperties != null)
{
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.MessageId, out string messageId))
{
message.MessageId = messageId;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.MsgCorrelationId, out string correlationId))
{
message.CorrelationId = correlationId;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.UserId, out string userId))
{
message.UserId = userId;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.ContentType, out string contentType))
{
message.ContentType = contentType;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.ContentEncoding, out string contentEncoding))
{
message.ContentEncoding = contentEncoding;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.To, out string to))
{
message.To = to;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.CreationTime, out string creationTime))
{
message.CreationTimeUtc = DateTime.ParseExact(creationTime, "o", CultureInfo.InvariantCulture);
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.MessageSchema, out string messageSchema))
{
message.MessageSchema = messageSchema;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.ComponentName, out string componentName))
{
message.ComponentName = componentName;
}
if (inputMessage.SystemProperties.TryGetNonEmptyValue(SystemProperties.InterfaceId, out string interfaceId)
&& interfaceId.Equals(Constants.SecurityMessageIoTHubInterfaceId, StringComparison.OrdinalIgnoreCase))
{
message.SetAsSecurityMessage();
}
}
return message;
}
public IMessage ToMessage(Message sourceMessage)
{
EdgeMessage message = new EdgeMessage.Builder(sourceMessage.GetBytes())
.SetProperties(sourceMessage.Properties)
.Build();
message.SystemProperties.AddIfNonEmpty(SystemProperties.MessageId, sourceMessage.MessageId);
message.SystemProperties.AddIfNonEmpty(SystemProperties.MsgCorrelationId, sourceMessage.CorrelationId);
message.SystemProperties.AddIfNonEmpty(SystemProperties.UserId, sourceMessage.UserId);
message.SystemProperties.AddIfNonEmpty(SystemProperties.ContentType, sourceMessage.ContentType);
message.SystemProperties.AddIfNonEmpty(SystemProperties.ContentEncoding, sourceMessage.ContentEncoding);
message.SystemProperties.AddIfNonEmpty(SystemProperties.To, sourceMessage.To);
message.SystemProperties.AddIfNonEmpty(SystemProperties.MessageSchema, sourceMessage.MessageSchema);
message.SystemProperties.AddIfNonEmpty(SystemProperties.LockToken, sourceMessage.LockToken);
message.SystemProperties.AddIfNonEmpty(SystemProperties.DeliveryCount, sourceMessage.DeliveryCount.ToString());
message.SystemProperties.AddIfNonEmpty(SystemProperties.ComponentName, sourceMessage.ComponentName);
if (sourceMessage.SequenceNumber > 0)
{
message.SystemProperties.AddIfNonEmpty(SystemProperties.SequenceNumber, sourceMessage.SequenceNumber.ToString());
}
DateTime enqueuedTime = sourceMessage.EnqueuedTimeUtc == DateTime.MinValue ? DateTime.UtcNow : sourceMessage.EnqueuedTimeUtc.Add(ClockSkewAdjustment);
message.SystemProperties.Add(SystemProperties.EnqueuedTime, enqueuedTime.ToString("o"));
if (sourceMessage.ExpiryTimeUtc > DateTime.MinValue)
{
message.SystemProperties.Add(SystemProperties.ExpiryTimeUtc, sourceMessage.ExpiryTimeUtc.ToString("o"));
}
if (sourceMessage.CreationTimeUtc > DateTime.MinValue)
{
message.SystemProperties.Add(SystemProperties.CreationTime, sourceMessage.CreationTimeUtc.ToString("o"));
}
return message;
}
}
}