使用TCP / IP 套接字(Sockets)
TCP/IP 套接字提供了跨网络的低层控制。TCP/IP套接字是两台计算机之间的逻辑连接,通过它,计算机能够在任何时间发送或接收数据;这个连接一直保持,直到这台计算机显式发出关闭指令。它提供了高度的灵活性,但也带来了大量的问题,在这一章中我们会看到,因此,除非真的需要非常高度的控制,否则,最好还是使用更抽象的网络协议,在这一章的后面我们也会谈到。
为了使用TCP/IP 套接字所必须的类包含在命名空间System.Net,表 11-1 进行了汇总。
表 11-1 使用 TCP/IP 套接字需要的类
类 |
描述 |
System.Net.Sockets .TcpListener |
服务器用这个类监听入站请求。 |
System.Net.Sockets .TcpClient |
服务器和客户端都使用这个类,控制如何在网络上发送数据。 |
System.Net.Sockets .NetworkStream |
这个类用于在网络上发送和接收数据。在网络上是发送字节,因此,要发送文本,通常要打包到另外的流类型中。 |
System.IO.StreamReader |
这个类用于包装 NetworkStream 类,用来读取文本。 StreamReader 提供两个方法 ReadLine 和 ReadToEnd,以字符串形式返回流中的数据。 在 StreamReader [ 这里好像不应该是 StreamWriter ]创建时,可以使用各种不同的文本编码,由System.Text.Encoding 类的实例提供。 |
System.IO.StreamWriter |
这个类用于包装 NetworkStream 类,用于写文本。 StreamWriter 提供两个方法 Write 和 WriteLine,以字符串形式把数据写入流。 在 StreamWriter 创建时,可以使用各种不同的文本编码,由System.Text.Encoding 类的实例提供。 |
这一章的第一个例子,我们将创建一个聊天程序,有聊天服务器(见清单 11-1 )和客户端(见清单 11-2 )。聊天服务器的任务是等待并监听客户端的连接,一旦有客户端连接,必须要求客户提供用户名;还必须连续监听所有客户端的入站消息。一旦有入站消息到达,就把这个消息推给所有的客户端。客户端的任务是连接到服务器,并提供用户界面,来阅读接收到的消息,写消息并发送给其他用户。TCP/IP 连接非常适合这种类型的应用程序,因为,连接总是可用,服务器能够直接把入站消息推给客户端,而不必从客户端拉消息。
清单 11-1 聊天服务器
open System
open System.IO
open System.Net
open System.Net.Sockets
open System.Text
open System.Threading
open System.Collections.Generic
// Enhance the TcpListener class so it canhandle async connections
type System.Net.Sockets.TcpListener with
memberx.AsyncAcceptTcpClient() =
Async.FromBeginEnd(x.BeginAcceptTcpClient,x.EndAcceptTcpClient)
// Type that defines protocol forinteracting with the ClientTable
type ClientTableCommands =
|Add of (string * StreamWriter)
|Remove of string
|SendMessage of string
|ClientExists of (string * AsyncReplyChannel<bool>)
// A class that will store a list of namesof connected clients along with
// streams that allow the client to bewritten too
type ClientTable() =
//create the mail box
letmailbox = MailboxProcessor.Start(fun inbox ->
//main loop that will read messages and update the
//client name/stream writer map
letrec loop (nameMap: Map<string, StreamWriter>) =
async { let! msg = inbox.Receive()
match msg with
| Add (name, sw) ->
return! loop (Map.add name swnameMap)
| Remove name ->
return! loop (Map.remove namenameMap)
| ClientExists (name, rc) ->
rc.Reply (nameMap.ContainsKeyname)
return! loop nameMap
| SendMessage msg ->
for (_, sw) in Map.toSeq nameMapdo
try
sw.WriteLine msg
sw.Flush()
with _ -> ()
return! loop nameMap }
//start the main loop with an empty map
loopMap.empty)
///add a new client
memberx.Add(name, sw) = mailbox.Post(Add(name, sw))
///remove an existing connection
memberx.Remove(name) = mailbox.Post(Remove name)
///handles the process of sending a message to all clients
memberx.SendMessage(msg) = mailbox.Post(SendMessage msg)
///checks if a client name is taken
memberx.ClientExists(name) = mailbox.PostAndReply(fun rc -> ClientExists(name,rc))
/// perform async read on a network streampassing a continuation
/// function to handle the result
let rec asyncReadTextAndCont (stream:NetworkStream) cont =
//unfortunatly we need to specific a number of bytes to read
//this leads to any messages longer than 512 being broken into
//different messages
async{ let buffer = Array.create 512 0uy
let! read = stream.AsyncRead(buffer, 0,512)
let allText = Encoding.UTF8.GetString(buffer, 0, read)
return cont stream allText }
// class that will handle clientconnections
type Server() =
//client table to hold all incoming client details
letclients = new ClientTable()
//handles each client
lethandleClient (connection: TcpClient) =
//get the stream used to read and write from the client
letstream = connection.GetStream()
//create a stream write to more easily write to the client
letsw = new StreamWriter(stream)
//handles reading the name then starts the main loop that handles
//conversations
letrec requestAndReadName (stream: NetworkStream) (name: string) =
// read the name
let name = name.Replace(Environment.NewLine,"")
// main loop that handles conversations
let rec mainLoop (stream: NetworkStream) (msg: string) =
try
// send received message to all clients
let msg = Printf.sprintf "%s: %s" name msg
clients.SendMessage msg
with _ ->
// any error reading a message causes client to disconnect
clients.Remove name
sw.Close()
Async.Start (asyncReadTextAndCont stream mainLoop)
if clients.ClientExists(name) then
// if name exists print error and relaunch request
sw.WriteLine("ERROR - Name in use already!")
sw.Flush()
Async.Start (asyncReadTextAndCont stream requestAndReadName)
else
// name is good lanch the main loop
clients.Add(name, sw)
Async.Start (asyncReadTextAndCont stream mainLoop)
//welcome the new client by printing "What is you name?"
sw.WriteLine("Whatis your name? ");
sw.Flush()
//start the main loop that handles reading from the client
Async.Start(asyncReadTextAndCont stream requestAndReadName)
//create a tcp listener to handle incoming requests
letlistener = new TcpListener(IPAddress.Loopback, 4242)
//main loop that handles all new connections
letrec handleConnections() =
//start the listerner
listener.Start()
iflistener.Pending() then
// if there are pending connections, handle them
async { let! connection = listener.AsyncAcceptTcpClient()
printfn "New Connection"
// use a thread pool thread to handle the new request
ThreadPool.QueueUserWorkItem(fun _ ->
handleClient connection) |>ignore
// loop
return! handleConnections() }
else
// no pending connections, just loop
Thread.Sleep(1)
async { return! handleConnections() }
///allow tot
memberserver.Start() = Async.RunSynchronously (handleConnections())
// start the server class
(new Server()).Start()
我们从头开始看一下清单11-1 的程序。第一步定义一个类ClientTable,来管理连接到服务器的客户端;这是一个很好的示例,解释了如何如用信箱处理程序(MailboxProcessor)安全地在几个线程之间共享数据,这个方法与第十章“消息传递”的非常相似。我们回忆一下,信箱处理程序把非常客户端的消息进行排队,通过调用MailboxProcessor 类的 Receive 方法接收这些消息:
let! msg = inbox.Receive()
我们总是异步接收消息,这样,可以在等待消息期间不阻塞线程。提交消息给类,使用Post 方法:
mailbox.Post(Add(name, sw))
我们使用联合类型来定义发关和接收的消息。在这里,我们定义了四种操作,分别是Add、Remove、Current、SendMessage 和 ClientExists:
type ClientTableCommands =
|Add of (string * StreamWriter)
|Remove of string
|SendMessage of string
|ClientExists of (string * AsyncReplyChannel<bool>)
这些操作用模式匹配实现,根据接收到的消息,它在专门负责接收消息的异步工作流中,通常,我们使用无限递归的循环连续地读消息。在这个示例中,这个函数是 loop,loop 有一个参数,它是 F# 中不可变的 Map 类,负责在用字符串表示的客户端名字与用StreamWriter 表示的到这个客户端的连接之间的映射:
let rec loop (nameMap: Map<string,StreamWriter>) =
...
这种很好地实现了操作之间的状态共享。操作先更新这个映射,然后,再传递一个新的实例给下一次循环。Add 和Remove 的操作实现很简单,创建一个更新过的新映射,然后,传递给下一次循环。下面的代码只展示了 Add 操作,因为 Remove 操作非常相似:
| Add (name, sw) ->
return! loop (Map.add name sw nameMap)
ClientExists 操作有更有趣一点,因为必须要返回一个结果,对此,使用 AsyncReplyChannel(异步应答通道),它包含在ClientExists 联合的情况中:
| ClientExists (name, rc) ->
rc.Reply (nameMap.ContainsKey name)
return! loop nameMap
把消息传递给MailboxProcessor 类,是通过使用它的 PostAndReply方法,注意,不是前面看到过的 Post 方法,应答通道被加载到联合的情况中:
mailbox.PostAndReply(fun rc -> ClientExists(name,rc))
可能最有趣的操作是SendMessage,需要枚举出所有的客户端,然后把消息传递给它们。执行这个在MailboxProcessor 类当中,因为这个类实现了一个排队系统,这样,就能保证只有一个消息传递给所有的客户端只有一次;这种方法还保证了一个消息文本不会和其他消息混在一起,以及消息的到达顺序不会改变:
| SendMessage msg ->
for(_, sw) in Map.to_seq nameMap do
try
sw.WriteLine msg
sw.Flush()
with _ -> ()
下面,我们将看到代码中最困难的部分:如何有效地从连接的客户端读消息。要有效地读消息,必须使用异步的方式读,才能保证宝贵的服务器线程不会在等待客户端期间被阻塞,因为客户端发送消息相对并不频繁。F# 通过使用异步工作流,定代码已经很容易了;然而,要想让 F# 的异步工作流运行在最好的状态,还必须有大量的操作能够并发执行。在这里,我们想重复地执行一个异步操作,这是可能的,但有点复杂,因为我们必须使用连续的进行传递(continuation style passing)。我们定义一个函数 asyncReadTextAndCont,它异步地读网络流,并这个结果字符串和原始的网络流传递给它的连续函数(continuation
function)。这里的连续函数是 cont:
/// perform async read on a network streampassing a continuation
/// function to handle the result
let rec asyncReadTextAndCont (stream:NetworkStream) cont =
async { let buffer = Array.create 512 0uy
let! read = stream.AsyncRead(buffer, 0, 512)
let allText = acc + Encoding.UTF8.GetString(buffer, 0, read)
return cont stream allText }
所以要注意这个函数的重要的一点是,当读取发生,物理线程将从函数返回,并可能返回到线程池;然而,我们不必担心物理线程太多的问题,因为当异步输入输出操作完成时,它会重新启动,结果会传递给 cont 函数。
然后,使用这个函数执行读客户端的所有任务,例如,主递归循环可能像这样:
let rec mainLoop (stream: NetworkStream)(msg: string) =
try
// send received message to all clients
let msg = Printf.sprintf "%s: %s" name msg
clients.SendMessage msg
with _ ->
// any error reading a message causes client to disconnect
clients.Remove name
sw.Close()
Async.Start (asyncReadTextAndCont stream mainLoop)
把接收到的 msg 字符串作为消息,执行发送操作;然后,使用asyncReadTextAndCont 函数进行递归循环,把 mainLoop 函数作为一个参数传递给它,再使用 Async.Start 函数发送消息,以fire-and-forget (启动后就不管了)模式启动异步工作流,就是说,它不会阻塞,并等待工作流的完成。
接着,创建TcpListener 类的实例。这个类是完成监听入站连接工作的,通常用被监听服务器的 IP 地址和端口号来初始化;当启动监听器时,告诉它监听的IP 地址。通常,它监听和这台计算机上网卡相关的所有 IP 地址的所有通信;然而,这仅是一个演示程序,因此,告诉TcpListener 类监听IPAddress.Loopback,表示只选取本地计算机的请求。使用端口号是判断网络通信只为这个应用程序服务,而不是别的。TcpListener 类一次只允许一个监听器监听一个端口。端口号的选择有点随意性,但要大于
1023,因为端口号 0 到 1023 是保留给专门的应用程序的。因此,我们在最后定义的函数handleConnections 中,使用 TcpListener 实例创建的监听器端口4242:
let listener = new TcpListener(IPAddress.Loopback,4242)
[
原文中为 server,就是来自上个版本,未做修改。
另外,本程序与原来的版本相比,作了较大的修改,或者说,是完全重写。
]
这个函数是个无限循环,它监听新的客户端连接,并创建新的线程来管理。看下面的代码,一旦有连接,就能检索出这个连接的实例,在新的线程池线程上启动管理它的工作。
let! connection =listener.AsyncAcceptTcpClient()
printfn "New Connection"
// use a thread pool thread to handle thenew request
ThreadPool.QueueUserWorkItem(fun _ ->handleClient connection) |> ignore
现在,我们知道了服务器是如何工作的,下面要看看客户端了,它在很多方面比服务器简单得多。清单11-2 是客户端的完整代码,注意,要引用Systems.Windows.Forms.dll 才能编译;在清单的后面是相关的代码讨论。
清单 11-2 聊天客户端
open System
open System.ComponentModel
open System.IO
open System.Net.Sockets
open System.Threading
open System.Windows.Forms
let form =
//create the form
letform = new Form(Text = "F# Talk Client")
//text box to show the messages received
letoutput =
newTextBox(Dock = DockStyle.Fill,
ReadOnly = true,
Multiline = true)
form.Controls.Add(output)
//text box to allow the user to send messages
letinput = new TextBox(Dock = DockStyle.Bottom, Multiline = true)
form.Controls.Add(input)
//create a new tcp client to handle the network connections
lettc = new TcpClient()
tc.Connect("localhost",4242)
//loop that handles reading from the tcp client
letload() =
letrun() =
let sr = new StreamReader(tc.GetStream())
while(true) do
let text = sr.ReadLine()
if text <> null && text <> "" then
// we need to invoke back to the "gui thread"
// to be able to safely interact with the controls
form.Invoke(new MethodInvoker(fun () ->
output.AppendText(text + Environment.NewLine)
output.SelectionStart <- output.Text.Length))
|> ignore
//create a new thread to run this loop
lett = new Thread(new ThreadStart(run))
t.Start()
//start the loop that handles reading from the tcp client
//when the form has loaded
form.Load.Add(fun_ -> load())
letsw = new StreamWriter(tc.GetStream())
//handles the key up event - if the user has entered a line
//of text then send the message to the server
letkeyUp () =
if(input.Lines.Length> 1) then
let text = input.Text
if (text <> null && text <> "") then
try
sw.WriteLine(text)
sw.Flush()
with err ->
MessageBox.Show(sprintf"Server error\n\n%O" err)
|> ignore
input.Text <- ""
//wire up the key up event handler
input.KeyUp.Add(fun_ -> keyUp ())
//when the form closes it‘s necessary to explicitly exit the app
//as there are other threads running in the back ground
form.Closing.Add(fun_ ->
Application.Exit()
Environment.Exit(0))
//return the form to the top level
form
// show the form and start the apps eventloop
[<STAThread>]
do Application.Run(form)
运行前面的代码,产生如图11-2 所示的客户端服务器程序。
图 11-2 聊天客户端服务器程序
现在我们就来看一下清单11-2 中的客户端是如何工作的。代码的第一部分完成窗体各部分的初始化,这不是我们现在感兴趣的,有关Windows 窗体程序工作原理的详细内容可以回头看第八章。清单11-2 中与TCP/IP 套接字编程相关的第一部分是连接服务器,通过创建TcpClient 类的实例,然后调用它的Connect 方法:
let tc = new TcpClient()
tc.Connect("localhost", 4242)
在这里,我们指定localhost,即表示本地计算机,端口 4242,与服务器监听的端口相同。在更实际的例子中,可以用服务器的 DNS 名称,或者由让用户指定 DNS 名。因为我们在同一台计算机上运行这个示例程序,localhost 也是不错的选择。
Load 函数负责从服务器读取数据,把它附加到窗体的 Load 事件,是为了保证窗体装载并初始化完成后就能运行,我们需要与窗体的控件进行交互:
form.Load.Add(fun _ -> load())
[
原文中是temp,有误。
]
为了保证及时读取服务器上的所有数据,需要创建一个新的线程去读所有的入站请求;先定义函数run,然后,使用它启动一个新线程:
let t = new Thread(new ThreadStart(run))
t.Start()
在run 的定义中,先创建StreamReader,从连接中读文本;然后,使用无限循环,这样,保证线程不退出,能够连续从连接中读数据。发现数据之后,要用窗体的Invoke 方法更新窗体,这是因为不能从创建这个窗体之外的线程中更新窗体:
form.Invoke(new MethodInvoker(fun () ->
output.AppendText(text+ Environment.NewLine)
output.SelectionStart<- output.Text.Length))
[
原文中是temp,有误。
]
客户端的另外一部分,也是重要功能,是把消息写到服务器,这是在keyUp 函数中完成的,它被附加到输入文本框(input)的KeyUp 事件,这样,在文本框中每按一次键,下面的代码会触发:
input.KeyUp.Add(fun _ -> keyUp () )
keyUp 函数的实现是非常简单,发现超过一行,表示已按过 Enter 键,就通过网络发送所有可用的文本,并清除文本框。
现在,我们已经知道如何实现客户端和服务器了,再看看有关这个应用程序的一般问题。在清单11-1 和 11-2 中,在每次网络操作后都调用了Flush();否则,要等到流的缓存已满,数据才会通过网络传输,这会导致一个用户必须输入很多消息,才能出现在其他用户的屏幕上。
这种方法也有一些问题,特别是在服务器端。为每个入站的客户端分配一个线程,保证了对每个客户端能有很好的响应;但是,随着客户连接数的增加,对这些线程需要进行上下文切换(context switching,参见第十章“线程、内存、锁定和阻塞”一节)的数量也会增加,这样,服务器的整体性能就会下降。另外,每个客户端都要有它自己的线程,因此,客户端的最大数就受限于进程所能包含的最大线程数。这些问题是可以解决的,但是,通常简单的办法,是使用一些更加抽象的协议,下一节会有讨论。
使用 TCP / IP 套接字(Sockets)